celery+django+redis使

发布时间:2019-03-19 21:10:53编辑:auto阅读(2652)

    版本:

    • celery==3.1.25
    • django==1.8.6
    • redis==2.10.6

    安装:

    • 进入虚拟环境(虚拟环境创建不同,进入方式不同)
    • pip install celery==3.1.25(以celery安装为例,其他安装方式相同)

    运行环境:

    • window10(celery4.0以后不支持windows)
    • linux

    目录结构:

    • 最外面的test1为项目名称
    • 里面的test1与app同级目录,里面都是一些配置(其中celery.py放在其中)。
    • app2为项目中的一个app
    • tasks.py在app2中,这个tasks.py只针对app2使用。若其他app也有celery任务。建立同样的文件名即可。

    运行celery,需要的几个文件:

    • test1下的celery.py,代码如下(里面有介绍):
     1 from __future__ import absolute_import, unicode_literals
     2 import os
     3 from django.conf import settings
     4 from celery import Celery
     5 
     6 
     7 #设置 Django 的配置文件
     8 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test.settings')
     9 
    10 # 创建 celery 实例
    11 app = Celery('test1')
    12 
    13 # Using a string here means the worker will not have to
    14 # pickle the object when using Windows.
    15 app.config_from_object('django.conf:settings')
    16 
    17 # 搜索所有 app 中的 tasks
    18 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    19  
    20 @app.task(bind=True)
    21 def debug_task(self):
    22 print('Request: {0!r}'.format(self.request))
    View Code
    • test1下的settings.py,代码如下:
     1 """原来的django的settings内容"""
     2 
     3 #celery config
     4 #消息中间件(使用redis),消息代理,用于发布者传递消息给消费者
     5 BROKER_URL = 'redis://127.0.0.1:6379'
     6 #消息结果返回中间件(使用redis),用于存储任务执行结果
     7 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
     8 #允许的内容类型,
     9 CELERY_ACCEPT_CONTENT = ['json']
    10 #任务的序列化方式
    11 CELERY_TASK_SERIALIZER = 'json'
    12 #任务结果的序列化方式
    13 CELERY_RESULT_SERIALIZER = 'json'
    14 #celery时区,定时任务使用
    15 CELERY_TIMEZONE = 'Asia/Shanghai'
    16 from datetime import timedelta
    17 #定时任务处理,使用的schedule,里面的task,写上任务的“路径”,schedule设置时间,args设置参数。
    18 CELERYBEAT_SCHEDULE = {
    19 'add_every_10_seconds': {
    20 'task': 'app2.tasks.add',
    21 'schedule': timedelta(seconds=10),
    22 'args': (4,4)
    23 },
    24 }
    View Code
    • test1下的__init__.py,代码如下:

     

    1 from __future__ import absolute_import
    2 from .celery import app as celery_app
    3 # 这是为了确保在django启动时启动 celery
    View Code
    • app2下的tasks.py,这个tasks文件只针对该app2使用,其他app可以根据实际情况添加tasks.py文件,里面写相应的任务代码。代码如下:
    1 # -*- coding:utf-8 -*-
    2 from __future__ import absolute_import
    3 from auto_model_platform.celery import app
    4 
    5 @app.task
    6 def add(x, y):
    7     time.sleep(30)
    8     print("running...", x, y)
    9     return x + y
    View Code

     

    调用tasks.py内的函数:

    • 在app2.views.py中的某个视图函数中直接调用,比如调用tasks.py中的add(x,y)函数,这样调用。代码如下:
     1 from app2.tasks import add
     2 
     3 class TestView(View):
     4      def get(self, request):
     5 
     6     """其它逻辑"""
     7 
     8     #celery处理的其它任务(异步处理),下面这个代码,celery会去处理,django直接执行下面的其它逻辑
     9     r = add.delay(x,y)
    10     task_id = r.id
    11 
    12      """其它逻辑"""
    13 
    14     return JsonResponse({"data":"123"})
    View Code
    • 通过task_id可以查询celery异步处理完的结果
     1 from test1.celery import app
     2 status = app.AsyncResult(task_id).status
     3 result = app.AsyncResult(task_id).result
     4 
     5 #状态有这几种情况
     6 CELERY_STATUS = {
     7     'PENDING': '等待开始',
     8     'STARTED': '任务开始',
     9     'SUCCESS': '成功',
    10     'FAILURE': '失败',
    11     'RETRY': '重试',
    12     'REVOKED': '任务取消',
    13 }
    View Code

     

    服务器启动celery worker(消费者)任务,和定时任务:

    • 打开项目的虚拟环境,进入项目名称目录下,即跟app同一级目录下运行:
    • 定时任务:celery -A auto_model_platform worker -l info --beat
    • worker任务:celery -A auto_model_platform worker -l info

      

关键字