44. Python Celery多实

发布时间:2019-08-31 09:44:53编辑:auto阅读(1429)

    celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢?

    celery可以支持多台不同的计算机执行不同的任务或者相同的任务。

    如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议。

    具体可以查看AMQP文档详细了解。

    简单理解:

    可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,

    而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。

    如图:

    clipboard.png

    exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker

    写个例子:

    vim demon3.py

    from celery import Celery
    app = Celery()
    app.config_from_object("celeryconfig")
    @app.task
    def taskA(x, y):
        return x * y
    @app.task
    def taskB(x, y, z):
        return x + y + z
    @app.task
    def add(x, y):
        return x + y

    vim celeryconfig.py

    from kombu import Queue
    BORKER_URL = "redis://192.168.48.131:6379/1"				#1库
    CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"	#2库
    CELERY_QUEUES = {
        Queue("default", Exchange("default"), routing_key = "default"),
        Queue("for_task_A", Exchange("for_task_A"), routing_key = "for_task_A"),
        Queue("for_task_B", Exchange("for_task_B"), routing_key = "for_task_B")
    }
    #路由
    CELERY_ROUTES = {
        "demon3.taskA":{"queue": "for_task_A",  "routing_key": "for_task_A"},
        "demon3.taskB":{"queue": "for_task_B",  "routing_key": "for_task_B"}
    }

    下面把两个脚本导入服务器:

    指定taskA启动一个worker:

    # celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

    同理:

    # celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

    下面远程客户端调用:新文件

    vim remote.py

    from demon3 import *
    r1 = taskA.delay(10, 20)
    print (r1.result)
    print (r1.status)
    r2 = taskB.delay(10, 20, 30)
    time.sleep(1)
    prnit (r2.result)
    print (r2.status)
    #print (dir(r2))
    r3 = add.delay(100, 200)
    print (r3.result)
    print (r3.status)	#PENDING

    看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。

    下面,我们来启动一个worker来执行celery队列中的任务

    # celery -A tasks worker -l info -n worker.%h -Q celery 		##默认的

    可以看到这行的结果为success

    print(re3.status)    #SUCCESS


    定时任务:

    Celery 与 定时任务

    在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。

    下面我们接着在配置文件:celeryconfig.py,添加关于 CELERYBEAT_SCHEDULE 变量到脚本中去:

    CELERY_TIMEZONE = 'UTC'
    CELERYBEAT_SCHEDULE = {
        'taskA_schedule' : {
            'task':'tasks.taskA',
            'schedule':20,
            'args':(5,6)
        },
    'taskB_scheduler' : {
        'task':"tasks.taskB",
        "schedule":200,
        "args":(10,20,30)
        },
    'add_schedule': {
        "task":"tasks.add",
        "schedule":10,
        "args":(1,2)
        }
    }

    注意格式,否则会有问题

    启动:

    celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

    celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

    celery -A tasks worker -l info -n worker.%h -Q celery

    celery -A demon3 beat


关键字