python—Celery异步分布式

发布时间:2019-09-18 07:23:28编辑:auto阅读(1667)

    一、Celery异步分布式

    Celery  是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息

    Celery  用于存储消息以及celery执行的一些消息和结果


    对于brokers,官方推荐是rabbitmq和redis

    对于backend,也就是指数据库,为了简单一般使用redis


    clipboard.png


    使用redis连接url格式:

    redis://:password@hostname:port/db_number


    1)定义连接脚本tasks.py


    #!/usr/bin/env python
    from celery import Celery
    broker = "redis://192.168.2.230:6379/1"
    backend = "redis://192.168.2.230:6379/2"
    app = Celery("tasks", broker=broker, backend=backend)
    
    @app.task
    def add(x,y):
        return x+y


    2)安装启动celery

    pip install celery

    pip install redis

    启动方式:celery -A huang tasks -l info  #-l 等同于 --loglevel

    1.png


    3)执行测试 huang.py 

    #!/usr/bin/env python
    from tasks import add
    
    re = add.delay(10,20)
    
    print(re.result)   #任务返回值
    print(re.ready)     #如果任务被执行返回True,其他情况返回False
    
    print(re.get(timeout=1))  #带参数的等待,最后返回结果
    print(re.status)  #任务当前状态

    运行结果:

    30

    <bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

    30

    SUCCESS


    4)根据成功返回的key或celery界面输出的信息,查看redis存储

    blob.png


    说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的



    二、celery多进程

    1.png

    1)配置文件 celeryconfig.py

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    from kombu import Exchange,Queue
    
    BROKER_URL = "redis://192.168.2.230:6379/3"
    CELERY_RESULT_BACKEND = "redis://192.168.2.230:6379/4"
    
    CELERY_QUEUES = (
    Queue("default",Exchange("default"),routing_key="default"),
    Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
    Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
    )
    
    CELERY_ROUTES = {
    'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
    'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
    }


    2)tasks.py

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    from celery import Celery
    
    app = Celery()
    app.config_from_object("celeryconfig")
    
    @app.task
        def taskA(x,y):
        return x+y
        
    @app.task
        def taskB(x,y,z):
        return x+y+z


    3)启动celery

    celery -A tasks worker --loglevel info


    4)执行脚本huang2.py

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    from tasks import taskA,taskB
    
    re = taskA.delay(10,20)
    
    print(re.result)   #任务返回值
    print(re.ready)     #如果任务被执行返回True,其他情况返回False
    print(re.get(timeout=1))  #带参数的等待,最后返回结果
    print(re.status)  #任务当前状态
    
    re2 = taskB.delay(10,20,30)
    print(re2.result)
    print(re2.ready)
    print(re2.get(timeout=1))
    print(re2.status)


    5)运行结果

    None

    <bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

    30

    SUCCESS

    None

    <bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

    60

    SUCCESS



关键字

上一篇: H3C DHCP配置

下一篇: eclipse 安装 resin 3 步