celery动态添加任务

发布时间:2019-10-11 09:02:02编辑:auto阅读(2608)

    celery是一个基于Python的分布式调度系统,文档在这 ,最近有个需求,想要动态的添加任务而不用重启celery服务,找了一圈没找到什么好办法(也有可能是文档没看仔细),所以只能自己实现囉

    为celery动态添加任务,首先我想到的是传递一个函数进去,让某个特定任务去执行这个传递过去的函数,就像这样

    @app.task
    def execute(func, *args, **kwargs):
        return func(*args, **kwargs)
    

    很可惜,会出现这样的错误

    kombu.exceptions.EncodeError: Object of type 'function' is not JSON serializable
    

    换一种序列化方式

    @app.task(serializer='pickle')
    def execute(func, *args, **kwargs):
        return func(*args, **kwargs)
    

    结果又出现一大串错误信息

    ERROR/MainProcess] Pool callback raised exception: ContentDisallowed('Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)',)
    Traceback (most recent call last):
      File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
        return obj.__dict__[self.__name__]
    KeyError: 'chord'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
        return obj.__dict__[self.__name__]
    KeyError: '_payload'
    

    换一种思路

    func = import_string(func)
    

    不知道这样是否可以,结果测试: No

    哎,流年不利.

    最后一直测试,一直测试,终于找到了一种办法,直接上代码

    from importlib import import_module, reload
    
    app.conf.CELERY_IMPORTS = ['task', 'task.all_task']
    
    def import_string(import_name):
        import_name = str(import_name).replace(':', '.')
        modules = import_name.split('.')
        mod = import_module(modules[0])
        for comp in modules[1:]:
            if not hasattr(mod, comp):
                reload(mod)
            mod = getattr(mod, comp)
        return mod
    
    @app.task
    def execute(func, *args, **kwargs):
        func = import_string(func)
        return func(*args, **kwargs)
    

    项目结构是这样的

    ├── celery_app.py
    ├── config.py
    ├── task
    │   ├── all_task.py
    │   ├── __init__.py

    注意: 任务必须大于等于两层目录

    以后每次添加任务都可以先添加到all_task.py里,调用时不用再重启celery服务

    # task/all_task.py
    
    def ee(c, d):
        return c, d, '你好'
    
    # example
    from celery_app import execute
    
    execute.delay('task.all_task.ee', 2, 444)
    

    ok,另外发现celery也支持任务定时调用,就像这样

    execute.apply_async(args=['task.all_task.aa'], eta=datetime(2017, 7, 9, 8, 12, 0))
    

    简单实现一个任务重复调用的功能

    @app.task
    def interval(func, seconds, args=(), task_id=None):
        next_run_time = current_time() + timedelta(seconds=seconds)
        kwargs = dict(args=(func, seconds, args), eta=next_run_time)
        if task_id is not None:
            kwargs.update(task_id=task_id)
        interval.apply_async(**kwargs)
        func = import_string(func)
        return func(*args)
    

    大概意思就是先计算下次运行的时间,然后把任务添加到celery队列里,这里有个task_id有些问题,因为假设添加了每隔3s执行一个任务,
    它的task_id默认会使用uuid生成,如果想要再移除这个任务就不太方便,自定task_id可能会好一些,另外也许需要判断task_id是否存在

    AsyncResult(task_id).state
    

    ok,再献上一个好用的函数

    from inspect import getmembers, isfunction
    
    def get_tasks(module='task'):
        return [{
            'name': 'task:{}'.format(f[1].__name__),
            'doc': f[1].__doc__,
        } for f in getmembers(import_module(module), isfunction)]
    

    就这样.

关键字