Celery调用不同项目的任务

本文总阅读量

前记

PS: 水文笔记…

Celery的示例或者常用使用方式中, Celery的Woker以及任务调用经常都是在同一个项目里面.
但是随着项目的拓展,很多时候Celery任务调用端与Woker的代码并不是在同一个项目里(但是共享同一个Mq, Redis等配置和中间件),而且Celery是基于RabbitMQ进行任务信息传输的,非常适合不同的项目调用解耦.
Celery有一个非常好的概念–Signature,我们只要在远程端封装一个伪任务的Signature,就可以通过调用该Signature去让另一个项目的Celery执行对应的任务.

1.send_task

Celery的文档中,有提到如果是调用不同项目的Woker可以使用send_task方法,先看看Woker端的代码:

1
2
3
4
5
6
7
8
9
import time
from celery import Celery

celery: Celery = Celery() # 省略配置

# 一个模拟任务,会跑60秒
@celery.task(name="test.block")
def test(sleep_seconds=60):
time.sleep(sleep_seconds)

作为调用端,如果需要调用的话需要如下代码去调用:

1
2
3
4
from celery import Celery

celery: Celery = Celery() # 省略配置
celery.send_task("test.block")

可以发现只要通过send_task传送对应的任务名即可,事实上Celery最底层的远程调用逻辑也是通过send_task.不过如果要通过send_task远程调用并使用Celery的其他特性却比较困难,要自己去了解源码,并经过一定的封装,好再Celery中有一个叫签名–Signature的概念.

2.Signature

Signature非常好用,通过他我们可以很容易的使用Celery的高级方法,而且在不同项目中,我们的调用方法也是跟相同项目的调用方法是一致的.先看看通过Signature改进后的调用端代码:

1
2
3
4
5
6
7
8
9
from celery import Celery, signature

celery: Celery = Celery() # 省略配置
block_signature = signature(
'test.block', # 任务名
immutable=True, # chain时变量不可变
app=celery
)
block_signature.delay()

看起来参数比较多一点,但是可以看到我们可以像平常同个项目下调用task一样使用delay()方法.可以看做上面的block_signature等于woker项目的test().si,所以Signature也支持其他高级调用,如Chain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from celery import Celery, chain, signature

celery: Celery = Celery() # 省略配置
block_signature_1 = signature(
# 任务名
'test.block',
# chain时变量不可变
immutable=True,
kwargs={'sleep_seconds': 10}
app=celery
)
block_signature_2 = signature(
# 任务名
'test.block',
# chain时变量不可变
immutable=True,
kwargs={'sleep_seconds': 20}
app=celery
)
chain(block_signature_1, block_signature_2).delay()

3.附录

3.1检查任务是否运行结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery import Celery, chain, signature

celery: Celery = Celery() # 省略配置
# 其他任务类型,如group的task前缀并不是celery-task-meta-,需要查询文档
task_key_prefix: str = 'celery-task-meta-'

def celery_task_id_is_run(task_id: str) -> bool:
redis: Redis = celery.backend.client
if not task_id.startswith(task_key_prefix):
task_id += task_id
return redis.exists(task_id)

task_id: str = celery.send_task("test.block").task_id
celery_task_id_is_run(task_id)
查看评论