python 实现后台cron_tab

发布时间:2019-08-28 09:06:04编辑:auto阅读(1434)


    后台cron_table管理
    PHP安装扩展也可以真正的多线程,fork新的进程,php ,python 2种方式性能上差别多少不知道.
    基于python 强大,简洁选用python 做核心功能.

    使用mysql表存储cron任务,python守护进程每分钟读取MYSQL, 多线程分发fork子进程方式执行PHP代码.

    具体流程:

    1.cron 每分钟执行cron_table.py .

    2.cron_table.py 读取cron 任务表,把符合当前时间执行的cron 记录状态更新为runing

    3.cron_table_log 插入一条记录,cron_id,start_time

    4.采用多线程方式fork守护子进程,等待子进程执行完,

    5.子进程执行完子线程修改cron 任务的状态为no runing,更新log记录表的完成时间和子进程的输出信息.


    cron_table功能:
    1.web后台能添加修改cron 任务,可视化的操作.脱离需要操作服务器才能控制crontab
    2.及时的记录每一条cron当前的执行状态,时间,以及历史执行状态记录..对cron任务
    何时执行,执行时长,执行返回信息,等全面信息监控.对CRON 脚本优化,排查异常CRON等有直接的帮助.
    快速找到死循环CRON,性能低下的CRON程序.
    3.允许CRON 任务独占,许多任务是不能同时跑多次,做的独占机制能有效的避免上个周期跑的CRON 还没结束,
    这个周期到了,继续跑..
    4.可以设置cron最大运行时间
    5.完全脱离cron管理和cron任务业务代码的联系.
    有需要还允许后台控制kill cron 任务,可以不用shell 命令终止,发邮件,短信通知

    相对比现在项目里的cron记录方式,在构造函数,析构函数做file_put_contents低效操作.
    这种方式重大缺陷:CRON死循环不做析构函数写日志,CRON业务代码出错,整个CRON 不执行无法记录等..



    #!/usr/bin/python
    #coding=utf-8
    #引入 MySQL 模組
    import MySQLdb, re, time, sys, subprocess, threading
    
    def get_struct_time(time_stamp_int):
        """
        按整型时间戳获取格式化时间 分 时 日 月 周
        Args:
            time_stamp_int 为传入的值为时间戳(×××),如:1332888820
            经过localtime转换后变成
            time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)
        Return:
            list____返回 分 时 日 月 周
        """
        st_time = time.localtime(time_stamp_int)
        return [st_time.tm_min, st_time.tm_hour, st_time.tm_mday, st_time.tm_mon, st_time.tm_wday]
    def get_strptime(time_str, str_format):
        """从字符串获取 整型时间戳
        Args:
            time_str 字符串类型的时间戳 如 '31/Jul/2013:17:46:01'
            str_format 指定 time_str 的格式 如 '%d/%b/%Y:%H:%M:%S'
        Return:
            返回10位整型(int)时间戳,如 1375146861
        """
        return int(time.mktime(time.strptime(time_str, str_format)))
    def get_str_time(time_stamp, str_format='%Y%m%d%H%M'):
        """
        获取时间戳,
        Args:
            time_stamp 10位整型(int)时间戳,如 1375146861
            str_format 指定返回格式,值类型为 字符串 str
        Rturn:
            返回格式 默认为 年月日时分,如2013年7月9日1时3分 :201207090103
        """
        return time.strftime("%s" % str_format, time.localtime(time_stamp))
    def match_cont(patten, cont):
        """
        正则匹配(精确符合的匹配)
        Args:
            patten 正则表达式
            cont____ 匹配内容
        Return:
            True or False
        """
        res = re.match(patten, cont)
        if res:
            return True
        else:
            return False
    def handle_num(val, ranges=(0, 100), res=list()):
        """处理纯数字"""
        val = int(val)
        if val >= ranges[0] and val <= ranges[1]:
            res.append(val)
        return res
    def handle_nlist(val, ranges=(0, 100), res=list()):
        """处理数字列表 如 1,2,3,6"""
        val_list = val.split(',')
        for tmp_val in val_list:
            tmp_val = int(tmp_val)
            if tmp_val >= ranges[0] and tmp_val <= ranges[1]:
                res.append(tmp_val)
        return res
    def handle_star(val, ranges=(0, 100), res=list()):
        """处理星号"""
        if val == '*':
            tmp_val = ranges[0]
            while tmp_val <= ranges[1]:
                res.append(tmp_val)
                tmp_val = tmp_val + 1
        return res
    def handle_starnum(val, ranges=(0, 100), res=list()):
        """星号/数字 组合 如 */3"""
        tmp = val.split('/')
        val_step = int(tmp[1])
        if val_step < 1:
            return res
        val_tmp = int(tmp[1])
        while val_tmp <= ranges[1]:
            res.append(val_tmp)
            val_tmp = val_tmp + val_step
        return res
    def handle_range(val, ranges=(0, 100), res=list()):
        """处理区间 如 8-20"""
        tmp = val.split('-')
        range1 = int(tmp[0])
        range2 = int(tmp[1])
        tmp_val = range1
        if range1 < 0:
            return res
        while tmp_val <= range2 and tmp_val <= ranges[1]:
            res.append(tmp_val)
            tmp_val = tmp_val + 1
        return res
    def handle_rangedv(val, ranges=(0, 100), res=list()):
        """处理区间/步长 组合 如 8-20/3 """
        tmp = val.split('/')
        range2 = tmp[0].split('-')
        val_start = int(range2[0])
        val_end = int(range2[1])
        val_step = int(tmp[1])
        if (val_step < 1) or (val_start < 0):
            return res
        val_tmp = val_start
        while val_tmp <= val_end and val_tmp <= ranges[1]:
            res.append(val_tmp)
            val_tmp = val_tmp + val_step
        return res
    def parse_conf(conf, ranges=(0, 100), res=list()):
        """解析crontab 五个时间参数中的任意一个"""
        #去除空格,再拆分
        conf = conf.strip(' ').strip(' ')
        conf_list = conf.split(',')
        other_conf = []
        number_conf = []
        for conf_val in conf_list:
            if match_cont(PATTEN['number'], conf_val):
                #记录拆分后的纯数字参数
                number_conf.append(conf_val)
            else:
                #记录拆分后纯数字以外的参数,如通配符 * , 区间 0-8, 及 0-8/3 之类
                other_conf.append(conf_val)
        if other_conf:
            #处理纯数字外各种参数
            for conf_val in other_conf:
                for key, ptn in PATTEN.items():
                    if match_cont(ptn, conf_val):
                        res = PATTEN_HANDLER[key](val=conf_val, ranges=ranges, res=res)
        if number_conf:
            if len(number_conf) > 1 or other_conf:
                #纯数字多于1,或纯数字与其它参数共存,则数字作为时间列表
                res = handle_nlist(val=','.join(number_conf), ranges=ranges, res=res)
            else:
                #只有一个纯数字存在,则数字为时间 间隔
                res = handle_num(val=number_conf[0], ranges=ranges, res=res)
        return res
    def parse_crontab_time(conf_string):
        """
        解析crontab时间配置参数
        Args:
            conf_string   配置内容(共五个值:分 时 日 月 周)
                          取值范围 分钟:0-59 小时:1-23 日期:1-31 月份:1-12 星期:0-6(0表示周日)
        Return:
        crontab_range      list格式,分 时 日 月 周 五个传入参数分别对应的取值范围
        """
        time_limit    = ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))
        crontab_range = []
        clist = []
        conf_length = 5
        tmp_list = conf_string.split(' ')
        for val in tmp_list:
            if len(clist) == conf_length:
                break
            if val:
                clist.append(val)
        if len(clist) != conf_length:
            return -1, 'config error whith [%s]' % conf_string
        cindex = 0
        for conf in clist:
            res_conf = []
            res_conf = parse_conf(conf, ranges=time_limit[cindex], res=res_conf)
            if not res_conf:
                return -1, 'config error whith [%s]' % conf_string
            crontab_range.append(res_conf)
            cindex = cindex + 1
        return 0, crontab_range
    def time_match_crontab(crontab_time, time_struct):
        """
        将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
        Args:
            crontab_time____crontab配置中的五个时间(分 时 日 月 周)参数对应时间取值范围
            time_struct____ 某个整型时间戳,如:1375027200 对应的 分 时 日 月 周
        Return:
        tuple 状态码, 状态描述
        """
        cindex = 0
        for val in time_struct:
            if val not in crontab_time[cindex]:
                return 0, False
            cindex = cindex + 1
        return 0, True
    
    #crontab时间参数各种写法 的 正则匹配
    PATTEN = {
        #纯数字
        'number':'^[0-9]+$',
        #数字列表,如 1,2,3,6
        'num_list':'^[0-9]+([,][0-9]+)+$',
        #星号 *
        'star':'^\*$',
        #星号/数字 组合,如 */3
        'star_num':'^\*\/[0-9]+$',
        #区间 如 8-20
        'range':'^[0-9]+[\-][0-9]+$',
        #区间/步长 组合 如 8-20/3
        'range_div':'^[0-9]+[\-][0-9]+[\/][0-9]+$'
        #区间/步长 列表 组合,如 8-20/3,21,22,34
        #'range_div_list':'^([0-9]+[\-][0-9]+[\/][0-9]+)([,][0-9]+)+$'
        }
    #各正则对应的处理方法
    PATTEN_HANDLER = {
        'number':handle_num,
        'num_list':handle_nlist,
        'star':handle_star,
        'star_num':handle_starnum,
        'range':handle_range,
        'range_div':handle_rangedv
    }
    
    def mysqlConnect():
        return MySQLdb.connect(host="192.168.51.129", user="root", passwd="123456", db="model")
    
    def runCronTable(ns,control,method,id,logid):
    
        child = subprocess.Popen('/usr/local/php-5.4.20/bin/php -c /usr/local/php-5.4.20/php.ini index.php --ns=%s --control=%s --method=%s --id=%s' % (ns,control,method,id),shell=True,stdout = subprocess.PIPE)
        info,returnCode = child.communicate()
        db = mysqlConnect()
        cursor = db.cursor(MySQLdb.cursors.DictCursor)
        cursor.execute('update cron_table set done_time=%s,run_status=0 where id=%s' , (time.time(),id))
        cursor.execute('update cron_table_log set done_time=%s,info=%s where id=%s' , (time.time(),info,logid))
        cursor.close()
        db.close()
        return True
    
    if __name__ == '__main__':
    
        #連接到 MySQL
        db = mysqlConnect()
        cursor = db.cursor(MySQLdb.cursors.DictCursor)
        #執行 SQL 語句
        cursor.execute("SELECT * FROM cron_table where status=0 and run_status=0")
        result = cursor.fetchall()
    
        threads = []
        now = time.strftime('%M %H %d %m %w',time.localtime(time.time()))
        for r in result:
            res, desc = parse_crontab_time(r["cycle"])
            if res == 0:
                cron_time = desc
            else:
                cron_time = 0
    
            time_stamp = int(time.time())
            #解析 时间戳对应的 分 时 日 月 周
            time_struct = get_struct_time(time_stamp)
            match_res,isRun = time_match_crontab(cron_time, time_struct)
            if isRun:
                now = time.time()
                print "%s.." % r["id"]
                # 锁住cron 任务
                cursor.execute("update cron_table set run_status=%s where id=%s" , (now,r["id"]))
                # 添加日志
                cursor.execute("insert cron_table_log (cron_id,start_time,done_time,info) VALUES (%s,%s,0,'')" , (r["id"],now))
                thd = threading.Thread(target=runCronTable,args=(r["ns"],r["control"],r["method"],r["id"],int(db.insert_id())))
                thd.setDaemon(True)
                thd.start()
                threads.append(thd)
    
        db.close()
        cursor.close()
    
        for t in threads:
            t.join()
    
        print "\nExiting Main Thread\n"


    MYSQL 表结构


    CREATE TABLE `cron_table` (
      `id` int(10) unsigned NOT NULL auto_increment,
      `ns` varchar(100) NOT NULL default '' COMMENT '命名空间',
      `control` varchar(50) NOT NULL default '' COMMENT '控制器',
      `method` varchar(50) NOT NULL default '' COMMENT '方法',
      `cycle` varchar(50) NOT NULL COMMENT '运行周期',
      `run_status` int(10) unsigned NOT NULL default '0' COMMENT '运行状态',
      `done_time` int(10) unsigned NOT NULL default '0' COMMENT '完成时间',
      `status` tinyint(4) unsigned NOT NULL default '1' COMMENT '开启关闭',
      `desc` varchar(500) NOT NULL default '' COMMENT 'CRON描述',
      PRIMARY KEY  (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;



    CREATE TABLE `cron_table_log` (
      `id` int(10) unsigned NOT NULL auto_increment,
      `cron_id` int(10) unsigned NOT NULL default '0' COMMENT 'cron_id',
      `start_time` int(10) unsigned NOT NULL default '0' COMMENT '开始执行时间',
      `done_time` int(10) unsigned NOT NULL default '0' COMMENT '脚本执行完成时间',
      `info` varchar(3000) NOT NULL default '' COMMENT '脚本输出',
      PRIMARY KEY  (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


关键字