发布时间:2019-09-18 07:23:28编辑:auto阅读(1667)
一、Celery异步分布式
Celery 是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息
Celery 用于存储消息以及celery执行的一些消息和结果
对于brokers,官方推荐是rabbitmq和redis
对于backend,也就是指数据库,为了简单一般使用redis
使用redis连接url格式:
redis://:password@hostname:port/db_number
1)定义连接脚本tasks.py
#!/usr/bin/env python from celery import Celery broker = "redis://192.168.2.230:6379/1" backend = "redis://192.168.2.230:6379/2" app = Celery("tasks", broker=broker, backend=backend) @app.task def add(x,y): return x+y
2)安装启动celery
pip install celery
pip install redis
启动方式:celery -A huang tasks -l info #-l 等同于 --loglevel
3)执行测试 huang.py
#!/usr/bin/env python from tasks import add re = add.delay(10,20) print(re.result) #任务返回值 print(re.ready) #如果任务被执行返回True,其他情况返回False print(re.get(timeout=1)) #带参数的等待,最后返回结果 print(re.status) #任务当前状态
运行结果:
30
<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>
30
SUCCESS
4)根据成功返回的key或celery界面输出的信息,查看redis存储
说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的
二、celery多进程
1)配置文件 celeryconfig.py
#!/usr/bin/env python #-*- coding:utf-8 -*- from kombu import Exchange,Queue BROKER_URL = "redis://192.168.2.230:6379/3" CELERY_RESULT_BACKEND = "redis://192.168.2.230:6379/4" CELERY_QUEUES = ( Queue("default",Exchange("default"),routing_key="default"), Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"), Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") ) CELERY_ROUTES = { 'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"}, 'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"} }
2)tasks.py
#!/usr/bin/env python #-*- coding:utf-8 -*- from celery import Celery app = Celery() app.config_from_object("celeryconfig") @app.task def taskA(x,y): return x+y @app.task def taskB(x,y,z): return x+y+z
3)启动celery
celery -A tasks worker --loglevel info
4)执行脚本huang2.py
#!/usr/bin/env python #-*- coding:utf-8 -*- from tasks import taskA,taskB re = taskA.delay(10,20) print(re.result) #任务返回值 print(re.ready) #如果任务被执行返回True,其他情况返回False print(re.get(timeout=1)) #带参数的等待,最后返回结果 print(re.status) #任务当前状态 re2 = taskB.delay(10,20,30) print(re2.result) print(re2.ready) print(re2.get(timeout=1)) print(re2.status)
5)运行结果
None
<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>
30
SUCCESS
None
<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>
60
SUCCESS
上一篇: H3C DHCP配置
下一篇: eclipse 安装 resin 3 步
47840
46387
37279
34731
29312
25970
24909
19949
19541
18026
5790°
6411°
5926°
5960°
7063°
5911°
5942°
6437°
6404°
7776°