Celery同一时刻只执行一个相同任务

本文总阅读量

前记

最近由于业务原因在接触Celery,Celery的理念挺不错的,但是只从文档很难去知道改去怎么订制Celery,希望我过段时间会有一篇Celery的源码分析文章.

最近在用Celery调用任务时,发现无法实现同一时刻,只有一个相同的任务运行,但是光看文档不知道需要怎么去处理,只能通过源码去分析运行流程,并在对应的流程加上自己的逻辑.

Ps: 我那时候的业务场景除了确保任务在同一时刻唯一,还有另外一个需求,就是同样的任务执行时,新的任务需要上个任务执行完成后才能执行.

1.如何确保任务在同一时刻唯一

一般情况下,都是通过一个锁实现在同一时刻内只有一个任务在跑,如下伪代码:

1
2
3
4
5
6
# 一个封装良好的锁对象可以被这样使用
lock = Lock()

with lock as l:
# run code
pass

我们只要在伪代码中执行我们的代码就可以了,如果只是一两个任务需要锁,那直接在任务里更改代码就可以了,但是如果要做到Celery通用,那就要设计一下.

2.需要做什么

由于要做到通用,那应该做到:

  • 1.结合Celery的任务事件功能, 做到任务脚本对锁无感知
  • 2.调用方可以通过参数来确定任务需要执行什么锁逻辑
  • 3.由于Celery的存储结果用到了Redis,那可以直接使用Redis

2.1.结合Celery的任务事件功能, 做到任务脚本对锁无感知

Celery中,我们可以继承他的Task对象来让每个任务拥有Celery的任务事件功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyTask(celery.Task):
# 任务失败时执行
def on_failure(self, exc, task_id, args, kwargs, einfo):
pass

# 任务成功时执行
def on_success(self, retval, task_id, args, kwargs):
pass

# 任务重试时执行
def on_retry(self, exc, task_id, args, kwargs, einfo):
pass

# 当任务执行完毕
def def after_return(self, status, retval, task_id, args, kwargs, einfo):
pass

然后在任务设置base参数,使任务应用该对象的事件.

1
celery.task(name='xxx', base='MyTask')

从Task对象中我们可以看到有个after_return的方法,我们可以在任务结束的时候通过该方法,释放锁,但是我们缺少一个入口, Celery的Task入口在哪呢?

我们知道,Celery可以直接通过调用Task.apply()来调用任务,那任务的入口就可以从这里入手,通过查看源码发现,Celery在调用任务时,是通过Celery.app.trece.py的build_tracer方法来执行任务的,在build_tracer刚开始就有一句代码,指明调用入口

1
fun = task if task_has_custom(task, '__call__') else task.run

以及在下面有一句代码, 执行fun,也就是执行任务,执行完再把他标记为成功

1
2
3
4
5
try:
R = retval = fun(*args, **kwargs)
state = SUCCESS
except Reject as exc:
pass

看来Celery的入口就是Task的__call__或者run方法了,而且可以发现,我们可以在__call__中写我们的锁逻辑,只要拿到锁的才可以去调用super().__call__去执行任务,否则就直接return,所以我们可以把Task对象改造为:

1
2
3
4
5
class MyTask(celery.Task):
def __call__(self, *args, **kwargs):
# 伪代码
with lock as l:
super().__call__(*args, **kwargs)

2.调用方可以通过参数来确定任务需要执行什么锁逻辑

在通过了解Celery.app.trece.py的build_tracer方法可以发现,调用方的参数是通过*args,**kwargs来传给任务的,我们只要对**kwargs进行改造即可,如果获取到celery_once_key,那key为对应的值,如果获取不到则为任务名,这里之所以这样设计而不直接使用任务名是可以把key的定义规则交给调用方,调用方可能会根据不同参数来让任务类型更细化的划分.

需要知道的是这里需要用pop而不是用get, 因为不pop的话,对应的值就会传到任务那边了.
后面如果有更多的参数,也可以通过这样拓展.:

1
2
3
4
5
6
7
8
9
10
11
12
class MyTask(celery.Task):

def get_key(self, args, kwargs) -> key:
key: Optional[str] = kwargs.pop('celery_once_key', None)
if not once_key:
key = self.name + ':'.join(args)
return key

def __call__(self, *args, **kwargs):
# 伪代码
with lock as l:
super().__call__(*args, **kwargs)

2.3.使用Redis

由于大部分都是使用Redis来做Celery的结果存储,所以我们可以直接调用Celery的redis的lock对象来实现锁,并且通过参数拓展更多的功能.

首先先说明下Redis中Lock参数的意义

1
lock(key, timeout=None, blocking_timeout=None, sleep=0.1)
  • key就是锁的名
  • timeout为锁的过期时间,None则代表一直持有锁
  • blocking_timeout为尝试等待获取锁的时间,如果在该时间内没获取锁就报错,None为一直在等待
  • sleep为在等待获取锁的循环间隙时间

接下来就开始更改下我们的MyTask(说明见代码注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MyTask(celery.Task):
def get_lock(self, args, kwargs) -> Lock:
"""通过kwargs来获取redis.lock所需要的参数"""
once_key: Optional[str] = kwargs.pop('celery_once_key', None)
if not once_key:
once_key = self.name + ':'.join(args)
timeout: int = kwargs.pop('celery_once_timeout', None)
blocking_timeout: int = kwargs.pop('celery_once_blocking_timeout', None)
sleep: float = kwargs.pop('celery_once_sleep', 0.1)
redis: Redis = self.backend.client
return redis.lock(once_key, timeout=timeout, blocking_timeout=blocking_timeout, sleep=sleep)

def __call__(self, *args, **kwargs):
lock = self.get_lock(args, kwargs)
# 根据build_tracer逻辑,这里如果执行超时,会抛错并执行Task的失败event
with lock as _lock:
return super().__call__(*args, **kwargs)

这样就完成啦, 不过,如果设置timeout=None, blocking_timeout=None时,收到另外一个相同的任务后会一直卡在执行中,直到上一个任务完成后,他才会继续执行,这样有点不符合我们的逻辑.收到的另外一个任务应该是要被放弃掉,好像在我们上面看的build_tracer代码中,他有会Reject的异常进行处理,如果是Reject异常,那任务会直接完成,且不会把结果记录下来.

1
2
3
4
5
try:
R = retval = fun(*args, **kwargs)
state = SUCCESS
except Reject as exc:
pass

所以经过最后的修改,完整的代码就像下面这样啦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MyTask(Task):
def get_lock(self, args, kwargs) -> Lock:
once_key: Optional[str] = kwargs.pop('celery_once_key', None)
if not once_key:
once_key = self.name + ':'.join(args)
timeout: int = kwargs.pop('celery_once_timeout', None)
blocking_timeout: int = kwargs.pop('celery_once_blocking_timeout', None)
sleep: float = kwargs.pop('celery_once_sleep', 0.1)
redis: Redis = self.backend.client
return redis.lock(once_key, timeout=timeout, blocking_timeout=blocking_timeout, sleep=sleep)

def __call__(self, *args, **kwargs):
lock = self.get_lock(args, kwargs)
enable_wait: bool = kwargs.pop('celery_once_enable_wait', False)
if not enable_wait and lock.locker():
# 如果任务不需要等待且获取不到锁,则需要被标记为Reject并放弃执行
logger.info(
f'lock:{lock.name} not release and not eanble retry, task ignore, args {args}, kwargs {kwargs}'
)
raise Reject
else:
with lock as _lock:
return super().__call__(*args, **kwargs)

3.附录

3.1完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import time
from typing import Optional

import Celery
from celery import Task
from celery.exceptions import Reject
from celery.utils.log import get_task_logger
from redis.client import Redis
from redis.lock import Lock

logger = get_task_logger(__name__)


class OnceTask(Task):
"""
OnceTask需要在调用任务时从kwargs传入如下参数(非必选)
celery_once_key: 锁的key,为空时以task_name + args为key
celery_once_timeout: 锁的持有时间,默认为None, 也就是等到任务主动调用release之前,锁一直存在
celery_once_enable_retry: 如果检查到锁存在, 是否需要尝试获取锁
celery_once_blocking_timeout: 尝试获取锁的时间, 默认为None, 也就是一直在尝试
celery_once_sleep: 每次尝试获取锁的睡眠时间, 默认为0.1, 也就是锁的隔多久尝试一次
timeout=None, sleep=0.1, blocking_timeout=None,

举例子:
如果要确保只能只有一个任务运行,其他任务运行会被忽略则:
celery_once_timeout: None, celery_once_enable_retry: False
如果要确保一个任务运行时,其他任务排队,并在上一个任务运行完再运行则:
celery_once_timeout: None, celery_once_enable_retry: True
"""

def get_lock(self, args, kwargs) -> Lock:
once_key: Optional[str] = kwargs.pop('celery_once_key', None)
if not once_key:
once_key = self.name + ':'.join(args)
timeout: int = kwargs.pop('celery_once_timeout', None)
blocking_timeout: int = kwargs.pop('celery_once_blocking_timeout', None)
sleep: float = kwargs.pop('celery_once_sleep', 0.1)
redis: Redis = self.backend.client
return redis.lock(once_key, timeout=timeout, blocking_timeout=blocking_timeout, sleep=sleep)

def __call__(self, *args, **kwargs):
lock = self.get_lock(args, kwargs)
enable_wait: bool = kwargs.pop('celery_once_enable_wait', False)
if not enable_wait and lock.locker():
# 如果任务不需要等待且获取不到锁,则需要被标记为Reject并放弃执行
logger.info(
f'lock:{self.lock.name} not release and not eanble retry, task ignore, args {args}, kwargs {kwargs}'
)
raise Reject
else:
with lock as _lock:
return super().__call__(*args, **kwargs)


app: 'Celery' = Celery()

@celery.task(name="tasks1", base=OnceTask)
def test(*args, **kwargs):
time.sleep(60)

test.apply()
test.apply() # 这个任务会被Reject
test.apply(kwargs={'celery_once_enable_retry': True, 'celery_once_sleep': 1}) # 这个任务会等第一个任务执行完了再执行
查看评论