Celery的坑

Celery pool选用prefork时多进程+协程的坑

坑一:节点和Broker断联

背景:
Celery默认使用多进程提高并发,因为task中多是发送Http请求,所以我采用gevent来提高并发。

代码片段如下:

1
2
3
4
5
6
7
8
from gevent import monkey
monkey.patch_all()

@app.task
def poc_call(jsondata):
...
业务代码
...

现象:
Broker选的是Rabbitmq,打了monkey补丁后,运行一段时间后,在RabbitMQ Management管理页面中查看consumer,发现已经没有Celery节点信息。

原因:
猜测原因gevent monkey补丁改变socket库,进而导致基于socket的kombu库异常

验证:

  1. 注释掉monkey补丁,没有观察到broker断联的情况
    1
    2
    3
    4
    5
    6
    7
    8
    from gevent import monkey
    # monkey.patch_all()

    @app.task
    def poc_call(jsondata):
    ...
    业务代码
    ...

解决方案:

  1. 在task中创建子进程,在子进程中使用gevent
    1
    2
    3
    4
    5
    6
    7
    from billiard.context import Process

    @app.task
    def poc_call(json_data):
    p = Process(target=_poc_call, args=(json_data,))
    p.start()
    p.join()

需要注意的是开启新进程时,如果使用multiprocess库,会报错

1
2
3
4
5
File "/data/poc-worker/tasks.py", line 336, in poc_call
p.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

这个问题的解决办法:
可以使用billiard库,from billiard.context import Process