Celery 3 版本 定时执行与 异

发布时间:2019-09-19 08:00:14编辑:auto阅读(1820)

    Celery介绍

    Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery。

    软件架构

    Celery 3 版本  定时执行与 异步执行 |  Django 案例

    Django案例

    环境

    * python3.6.4
    * django 2.0 
    * django-celery==3.2.1
    * django-kombu==0.9.4
    * celery-with-redis==3.0
    * celery==3.1.25

    目录结构

    autoops/
        autoops/settings
        tasks/tasks.py

    settings

    import djcelery
    
    INSTALLED_APPS = [
        'djcelery',
        'kombu', 
    ]
    
    djcelery.setup_loader()
    BROKER_URL = 'redis://127.0.0.1:6379/0'  #消息存储数据存储在仓库0
    
    CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # 指定 Backend
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    #CELERY_ALWAYS_EAGER = True   # 如果开启,Celery便以eager模式运行, 则task便不需要加delay运行
    
    CELERY_IMPORTS = ('tasks.tasks',)
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  #这是使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中

    tasks.py

    from celery import Celery, platforms
    
    platforms.C_FORCE_ROOT = True
    
    app = Celery('my_task')
    app.config_from_object('django.conf:settings',)
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    @app.task()
    def  ansbile():   ##如果想异步调用 ansible api,请在任务前面添加如下
    
        from multiprocessing import current_process
        # try:
        #     current_process()._config
        # except AttributeError:
        current_process()._config = {'semprefix': '/mp'}
    
    @app.task()
    def  cmd_job(host,cmd):   ## 执行命令
        i = asset.objects.get(network_ip=host)
        ret = ssh(ip=i.network_ip, port=i.port, username=i.username, password=i.password, cmd=cmd)
        return  ret['data']
    
    def test():  ##  下面是异步调用 celery 的例子
    
        from tasks.tasks import cmd_job
    
        aa = cmd_job.apply_async(args=('43.241.238.109', 'pwd'))
        print("id",aa.task_id,"返回值",aa.get() ,aa.result, "状态",aa.state)
    
        from  djcelery.models import TaskMeta
        b = TaskMeta.objects.get(task_id=aa).result
        print("返回值",b)
    

    Django 后台

    以下是定时执行,直接后台操作即可。很简单。

    Celery 3 版本  定时执行与 异步执行 |  Django 案例

    数据库结构

    * | celery_taskmeta                   ##异步任务,会将结果写入到这个表内
    * | celery_tasksetmeta              
    * | djcelery_crontabschedule    
    * | djcelery_intervalschedule    
    * | djcelery_periodictask           
    * | djcelery_periodictasks        
    * | djcelery_taskstate               ##django后台执行的定时任务,会将结果写到这个表里
    * | djcelery_workerstate           
     from  djcelery.models import TaskMeta,TaskState          ##这样获取表

    Celery 3 版本  定时执行与 异步执行 |  Django 案例

    在数据库里看 result 内容是乱码,但是 通过orm获取的时候,显示是正常的。请知悉。

    启动命令

     #实际执行任务的程序
    /usr/bin/python   /opt/autoops/manage.py   celery worker  -c  4        --loglevel=info
    
    #任务调度, 根据配置文件发布定时任务
    /usr/bin/python   /opt/autoops/manage.py celery beat --schedule=/tmp/celerybeat-schedule --pidfile=/tmp/django_celerybeat.pid --loglevel=INFO
    
    # Django 检查  workers  是否在线
    /usr/bin/python   /opt/autoops/manage.py   celerycam --frequency=10.0

关键字