Python任务调度模块APSched

发布时间:2019-09-26 07:19:28编辑:auto阅读(2078)

    介绍

    官网文档:http://apscheduler.readthedoc...
    API:http://apscheduler.readthedoc...

    APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:

    • triggers: 任务触发器组件,提供任务触发方式

    • job stores: 任务商店组件,提供任务保存方式

    • executors: 任务调度组件,提供任务调度方式

    • schedulers: 任务调度组件,提供任务工作方式

    安装

    pip 安装

    $ pip install apscheduler

    源码安装

    $ python setup.py install

    简单的实例

    from apscheduler.schedulers.blocking import BlockingScheduler
    import time
    
    # 实例化一个调度器
    scheduler = BlockingScheduler()
     
    def job1():
        print "%s: 执行任务"  % time.asctime()
    
    # 添加任务并设置触发方式为3s一次
    scheduler.add_job(job1, 'interval', seconds=3)
    
    # 开始运行调度器
    scheduler.start()

    输出:

    $ python first.py
    Fri Sep  8 20:41:55 2017: 执行任务
    Fri Sep  8 20:41:58 2017: 执行任务
    ...

    各组件功能

    trigger组件

    trigger提供任务的触发方式,共三种方式:

    • date:只在某个时间点执行一次run_date(datetime|str)

    scheduler.add_job(my_job, 'date', run_date=date(2017, 9, 8), args=[])
    scheduler.add_job(my_job, 'date', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
    scheduler.add_job(my_job, 'date', run_date='2017-9-08 21:30:05', args=[])
    # The 'date' trigger and datetime.now() as run_date are implicit
    sched.add_job(my_job, args=[[])
    • interval: 每隔一段时间执行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

    scheduler.add_job(my_job, 'interval', hours=2)
    scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)
    
    @scheduler.scheduled_job('interval', id='my_job_id', hours=2)
    def my_job():
        print("Hello World")
    • cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

    sched.add_job(my_job, 'cron', hour=3, minute=30)
    sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30')
    
    @sched.scheduled_job('cron', id='my_job_id', day='last sun')
    def some_decorated_task():
        print("I am printed at 00:00:00 on the last Sunday of every month!")

    scheduler组件

    scheduler组件提供执行的方式,在不同的运用环境中选择合适的方式

    • BlockingScheduler: 进程中只运行调度器时的方式

    from apscheduler.schedulers.blocking import BlockingScheduler
    import time
    
    scheduler = BlockingScheduler()
     
    def job1():
        print "%s: 执行任务"  % time.asctime()
    
    scheduler.add_job(job1, 'interval', seconds=3)
    scheduler.start()
    • BackgroundScheduler: 不想使用任何框架时的方式

    from apscheduler.schedulers.background import BackgroundScheduler
    import time
    
    scheduler = BackgroundScheduler()
     
    def job1():
        print "%s: 执行任务"  % time.asctime()
    
    scheduler.add_job(job1, 'interval', seconds=3)
    scheduler.start()
    
    while True:
        pass
    • AsyncIOScheduler: asyncio module的方式(Python3)

    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    try:
        import asyncio
    except ImportError:
        import trollius as asyncio
    ...
    ...
    # while True:pass 
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass
    • GeventScheduler: gevent方式

    from apscheduler.schedulers.gevent import GeventScheduler
    
    ...
    ...
    
    g = scheduler.start()
    # while True:pass
    try:
        g.join()
    except (KeyboardInterrupt, SystemExit):
        pass
    • TornadoScheduler: Tornado方式

    from tornado.ioloop import IOLoop
    from apscheduler.schedulers.tornado import TornadoScheduler
    
    ...
    ...
    
    # while True:pass
    try:
        IOLoop.instance().start()
    except (KeyboardInterrupt, SystemExit):
        pass
    • TwistedScheduler: Twisted方式

    from twisted.internet import reactor
    from apscheduler.schedulers.twisted import TwistedScheduler
    
    ...
    ...
    
    # while True:pass
    try:
        reactor.run()
    except (KeyboardInterrupt, SystemExit):
        pass
    • QtScheduler: Qt方式

    executors组件

    executors组件提供任务的调度方式

    • base

    • debug

    • gevent

    • pool(max_workers=10)

    • twisted

    jobstore组件

    jobstore提供任务的各种持久化方式

    • base

    • memory

    • mongodb
      scheduler.add_jobstore('mongodb', collection='example_jobs')

    • redis
      scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')

    • rethinkdb
      scheduler.add_jobstore('rethinkdb', database='apscheduler_example')

    • sqlalchemy
      scheduler.add_jobstore('sqlalchemy', url=url)

    • zookeeper
      scheduler.add_jobstore('zookeeper', path='/example_jobs')

    任务操作

    添加任务add_job(如上)

    如果使用了任务的存储,开启时最好添加replace_existing=True,否则每次开启都会创建任务的副本
    开启后任务不会马上启动,可修改trigger参数

    删除任务remove_job

    # 根据任务实例删除
    job = scheduler.add_job(myfunc, 'interval', minutes=2)
    job.remove()
    
    # 根据任务id删除
    scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
    scheduler.remove_job('my_job_id')

    任务的暂停pause_job和继续resume_job

    job = scheduler.add_job(myfunc, 'interval', minutes=2)
    # 根据任务实例
    job.pause()
    job.resume()
    
    # 根据任务id暂停
    scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
    scheduler.pause_job('my_job_id')
    scheduler.resume_job('my_job_id')

    任务的修饰modify和重设reschedule_job

    修饰:job.modify(max_instances=6, name='Alternate name')
    重设:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

    调度器操作

    • 开启 scheduler.start()

    • 关闭 scheduler.shotdown(wait=True | False)

    • 暂停 scheduler.pause()

    • 继续 scheduler.resume()

    • 监听 http://apscheduler.readthedoc...

    def my_listener(event):
        if event.exception:
            print('The job crashed :(')
        else:
            print('The job worked :)')
    
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

    官方实例

    from pytz import utc
    
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
    
    jobstores = {
        'mongo': MongoDBJobStore(),
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
    executors = {
        'default': ThreadPoolExecutor(20),
        'processpool': ProcessPoolExecutor(5)
    }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

关键字