前记 最近由于业务原因在接触Celery
,Celery
的理念挺不错的,但是只从文档很难去知道改去怎么订制Celery
,希望我过段时间会有一篇Celery
的源码分析文章.
最近在用Celery
调用任务时,发现无法实现同一时刻,只有一个相同的任务运行,但是光看文档不知道需要怎么去处理,只能通过源码去分析运行流程,并在对应的流程加上自己的逻辑.
Ps: 我那时候的业务场景除了确保任务在同一时刻唯一,还有另外一个需求,就是同样的任务执行时,新的任务需要上个任务执行完成后才能执行.
1.如何确保任务在同一时刻唯一 一般情况下,都是通过一个锁实现在同一时刻内只有一个任务在跑,如下伪代码:
1 2 3 4 5 6 lock = Lock()with lock as l: pass
我们只要在伪代码中执行我们的代码就可以了,如果只是一两个任务需要锁,那直接在任务里更改代码就可以了,但是如果要做到Celery
通用,那就要设计一下.
2.需要做什么 由于要做到通用,那应该做到:
1.结合Celery
的任务事件功能, 做到任务脚本对锁无感知
2.调用方可以通过参数来确定任务需要执行什么锁逻辑
3.由于Celery
的存储结果用到了Redis
,那可以直接使用Redis
锁
2.1.结合Celery
的任务事件功能, 做到任务脚本对锁无感知 在Celery
中,我们可以继承他的Task对象来让每个任务拥有Celery
的任务事件功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class MyTask (celery.Task ): def on_failure (self, exc, task_id, args, kwargs, einfo ): pass def on_success (self, retval, task_id, args, kwargs ): pass def on_retry (self, exc, task_id, args, kwargs, einfo ): pass def def after_return (self, status, retval, task_id, args, kwargs, einfo ): pass
然后在任务设置base参数,使任务应用该对象的事件.
1 celery.task(name='xxx' , base='MyTask' )
从Task对象中我们可以看到有个after_return
的方法,我们可以在任务结束的时候通过该方法,释放锁,但是我们缺少一个入口, Celery
的Task入口在哪呢?
我们知道,Celery
可以直接通过调用Task.apply()
来调用任务,那任务的入口就可以从这里入手,通过查看源码发现,Celery
在调用任务时,是通过Celery.app.trece.py
的build_tracer方法来执行任务的,在build_tracer刚开始就有一句代码,指明调用入口
1 fun = task if task_has_custom(task, '__call__' ) else task.run
以及在下面有一句代码, 执行fun,也就是执行任务,执行完再把他标记为成功
1 2 3 4 5 try : R = retval = fun(*args, **kwargs) state = SUCCESSexcept Reject as exc: pass
看来Celery
的入口就是Task的__call__
或者run
方法了,而且可以发现,我们可以在__call__
中写我们的锁逻辑,只要拿到锁的才可以去调用super().__call__
去执行任务,否则就直接return,所以我们可以把Task对象改造为:
1 2 3 4 5 class MyTask (celery.Task ): def __call__ (self, *args, **kwargs ): with lock as l: super ().__call__(*args, **kwargs)
2.调用方可以通过参数来确定任务需要执行什么锁逻辑 在通过了解Celery.app.trece.py
的build_tracer方法可以发现,调用方的参数是通过*args
,**kwargs
来传给任务的,我们只要对**kwargs
进行改造即可,如果获取到celery_once_key
,那key为对应的值,如果获取不到则为任务名,这里之所以这样设计而不直接使用任务名是可以把key的定义规则交给调用方,调用方可能会根据不同参数来让任务类型更细化的划分.
需要知道的是这里需要用pop而不是用get, 因为不pop的话,对应的值就会传到任务那边了. 后面如果有更多的参数,也可以通过这样拓展.:
1 2 3 4 5 6 7 8 9 10 11 12 class MyTask (celery.Task ): def get_key (self, args, kwargs ) -> key: key: Optional[str ] = kwargs.pop('celery_once_key' , None ) if not once_key: key = self.name + ':' .join(args) return key def __call__ (self, *args, **kwargs ): with lock as l: super ().__call__(*args, **kwargs)
2.3.使用Redis
锁 由于大部分都是使用Redis
来做Celery
的结果存储,所以我们可以直接调用Celery
的redis的lock对象来实现锁,并且通过参数拓展更多的功能.
首先先说明下Redis
中Lock参数的意义
1 lock(key, timeout=None , blocking_timeout=None , sleep=0.1 )
key就是锁的名
timeout为锁的过期时间,None则代表一直持有锁
blocking_timeout为尝试等待获取锁的时间,如果在该时间内没获取锁就报错,None为一直在等待
sleep为在等待获取锁的循环间隙时间
接下来就开始更改下我们的MyTask
(说明见代码注释):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class MyTask (celery.Task ): def get_lock (self, args, kwargs ) -> Lock: """通过kwargs来获取redis.lock所需要的参数""" once_key: Optional[str ] = kwargs.pop('celery_once_key' , None ) if not once_key: once_key = self.name + ':' .join(args) timeout: int = kwargs.pop('celery_once_timeout' , None ) blocking_timeout: int = kwargs.pop('celery_once_blocking_timeout' , None ) sleep: float = kwargs.pop('celery_once_sleep' , 0.1 ) redis: Redis = self.backend.client return redis.lock(once_key, timeout=timeout, blocking_timeout=blocking_timeout, sleep=sleep) def __call__ (self, *args, **kwargs ): lock = self.get_lock(args, kwargs) with lock as _lock: return super ().__call__(*args, **kwargs)
这样就完成啦, 不过,如果设置timeout=None, blocking_timeout=None时,收到另外一个相同的任务后会一直卡在执行中,直到上一个任务完成后,他才会继续执行,这样有点不符合我们的逻辑.收到的另外一个任务应该是要被放弃掉,好像在我们上面看的build_tracer代码中,他有会Reject的异常进行处理,如果是Reject异常,那任务会直接完成,且不会把结果记录下来.
1 2 3 4 5 try : R = retval = fun(*args, **kwargs) state = SUCCESSexcept Reject as exc: pass
所以经过最后的修改,完整的代码就像下面这样啦
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class MyTask (Task ): def get_lock (self, args, kwargs ) -> Lock: once_key: Optional[str ] = kwargs.pop('celery_once_key' , None ) if not once_key: once_key = self.name + ':' .join(args) timeout: int = kwargs.pop('celery_once_timeout' , None ) blocking_timeout: int = kwargs.pop('celery_once_blocking_timeout' , None ) sleep: float = kwargs.pop('celery_once_sleep' , 0.1 ) redis: Redis = self.backend.client return redis.lock(once_key, timeout=timeout, blocking_timeout=blocking_timeout, sleep=sleep) def __call__ (self, *args, **kwargs ): lock = self.get_lock(args, kwargs) enable_wait: bool = kwargs.pop('celery_once_enable_wait' , False ) if not enable_wait and lock.locker(): logger.info( f'lock:{lock.name} not release and not eanble retry, task ignore, args {args} , kwargs {kwargs} ' ) raise Reject else : with lock as _lock: return super ().__call__(*args, **kwargs)
3.附录 3.1完整代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import timefrom typing import Optionalimport Celeryfrom celery import Taskfrom celery.exceptions import Rejectfrom celery.utils.log import get_task_loggerfrom redis.client import Redisfrom redis.lock import Lock logger = get_task_logger(__name__)class OnceTask (Task ): """ OnceTask需要在调用任务时从kwargs传入如下参数(非必选) celery_once_key: 锁的key,为空时以task_name + args为key celery_once_timeout: 锁的持有时间,默认为None, 也就是等到任务主动调用release之前,锁一直存在 celery_once_enable_retry: 如果检查到锁存在, 是否需要尝试获取锁 celery_once_blocking_timeout: 尝试获取锁的时间, 默认为None, 也就是一直在尝试 celery_once_sleep: 每次尝试获取锁的睡眠时间, 默认为0.1, 也就是锁的隔多久尝试一次 timeout=None, sleep=0.1, blocking_timeout=None, 举例子: 如果要确保只能只有一个任务运行,其他任务运行会被忽略则: celery_once_timeout: None, celery_once_enable_retry: False 如果要确保一个任务运行时,其他任务排队,并在上一个任务运行完再运行则: celery_once_timeout: None, celery_once_enable_retry: True """ def get_lock (self, args, kwargs ) -> Lock: once_key: Optional[str ] = kwargs.pop('celery_once_key' , None ) if not once_key: once_key = self.name + ':' .join(args) timeout: int = kwargs.pop('celery_once_timeout' , None ) blocking_timeout: int = kwargs.pop('celery_once_blocking_timeout' , None ) sleep: float = kwargs.pop('celery_once_sleep' , 0.1 ) redis: Redis = self.backend.client return redis.lock(once_key, timeout=timeout, blocking_timeout=blocking_timeout, sleep=sleep) def __call__ (self, *args, **kwargs ): lock = self.get_lock(args, kwargs) enable_wait: bool = kwargs.pop('celery_once_enable_wait' , False ) if not enable_wait and lock.locker(): logger.info( f'lock:{self.lock.name} not release and not eanble retry, task ignore, args {args} , kwargs {kwargs} ' ) raise Reject else : with lock as _lock: return super ().__call__(*args, **kwargs) app: 'Celery' = Celery()@celery.task(name="tasks1" , base=OnceTask ) def test (*args, **kwargs ): time.sleep(60 ) test.apply() test.apply() test.apply(kwargs={'celery_once_enable_retry' : True , 'celery_once_sleep' : 1 })