Celery pool选用prefork时多进程+协程的坑
坑一:节点和Broker断联
背景:
Celery默认使用多进程提高并发,因为task中多是发送Http请求,所以我采用gevent来提高并发。
代码片段如下:1
2
3
4
5
6
7
8from gevent import monkey
monkey.patch_all()
@app.task
def poc_call(jsondata):
...
业务代码
...
现象:
Broker选的是Rabbitmq,打了monkey补丁后,运行一段时间后,在RabbitMQ Management管理页面中查看consumer,发现已经没有Celery节点信息。
原因:
猜测原因gevent monkey补丁改变socket库,进而导致基于socket的kombu库异常
验证:
- 注释掉monkey补丁,没有观察到broker断联的情况
1
2
3
4
5
6
7
8from gevent import monkey
# monkey.patch_all()
@app.task
def poc_call(jsondata):
...
业务代码
...
解决方案:
- 在task中创建子进程,在子进程中使用gevent
1
2
3
4
5
6
7from billiard.context import Process
@app.task
def poc_call(json_data):
p = Process(target=_poc_call, args=(json_data,))
p.start()
p.join()
需要注意的是开启新进程时,如果使用multiprocess
库,会报错
1
2
3
4
5File "/data/poc-worker/tasks.py", line 336, in poc_call
p.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
这个问题的解决办法:
可以使用billiard
库,from billiard.context import Process