发布时间:2019-09-07 08:12:51编辑:auto阅读(1908)
接触python一段时间了,最近要用py做个 监控功能,需要解析crontab中的配置信息, 本想偷懒一下,直接 百度/谷哥出来,无奈半天没找着,只好自己写一个,实现代码及使用 实例如下,望各位路过的大虾大神不吝赐教,能指点得到更优的处理办法:
#/usr/bin/env python
#_*_coding:utf-8_*_
# Copyright (c) 2013 stephen <zhangxmgogo@gmail.com>
# All rights reserved
"""
1.解析 crontab 配置文件中的五个数间参数(分 时 日 月 周),获取他们对应的取值范围
2.将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
"""
#$Id $
import re, time, sys
def get_struct_time(time_stamp_int):
____"""
____按整型时间戳获取格式化时间 分 时 日 月 周
____Args:
________time_stamp_int 为传入的值为时间戳(×××),如:1332888820
________经过localtime转换后变成
________time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)
____Return:
________list____返回 分 时 日 月 周
____"""
____st_time = time.localtime(time_stamp_int)
____return [st_time.tm_min, st_time.tm_hour, st_time.tm_mday, st_time.tm_mon, st_time.tm_wday]
def get_strptime(time_str, str_format):
____"""从字符串获取 整型时间戳
____Args:
________time_str 字符串类型的时间戳 如 '31/Jul/2013:17:46:01'
________str_format 指定 time_str 的格式 如 '%d/%b/%Y:%H:%M:%S'
____Return:
________返回10位整型(int)时间戳,如 1375146861
____"""
____return int(time.mktime(time.strptime(time_str, str_format)))
def get_str_time(time_stamp, str_format='%Y%m%d%H%M'):
____"""
____获取时间戳,
____Args:
________time_stamp 10位整型(int)时间戳,如 1375146861
________str_format 指定返回格式,值类型为 字符串 str
____Rturn:
________返回格式 默认为 年月日时分,如2013年7月9日1时3分 :201207090103
____"""
____return time.strftime("%s" % str_format, time.localtime(time_stamp))
def match_cont(patten, cont):
____"""
____正则匹配(精确符合的匹配)
____Args:
________patten 正则表达式
________cont____ 匹配内容
____Return:
________True or False
____"""
____res = re.match(patten, cont)
____if res:
________return True
____else:
________return False
def handle_num(val, ranges=(0, 100), res=list()):
____"""处理纯数字"""
____val = int(val)
____if val >= ranges[0] and val <= ranges[1]:
________res.append(val)
____return res
def handle_nlist(val, ranges=(0, 100), res=list()):
____"""处理数字列表 如 1,2,3,6"""
____val_list = val.split(',')
____for tmp_val in val_list:
________tmp_val = int(tmp_val)
________if tmp_val >= ranges[0] and tmp_val <= ranges[1]:
____________res.append(tmp_val)
____return res
def handle_star(val, ranges=(0, 100), res=list()):
____"""处理星号"""
____if val == '*':
________tmp_val = ranges[0]
________while tmp_val <= ranges[1]:
____________res.append(tmp_val)
____________tmp_val = tmp_val + 1
____return res
def handle_starnum(val, ranges=(0, 100), res=list()):
____"""星号/数字 组合 如 */3"""
____tmp = val.split('/')
____val_step = int(tmp[1])
____if val_step < 1:
________return res
____val_tmp = int(tmp[1])
____while val_tmp <= ranges[1]:
________res.append(val_tmp)
________val_tmp = val_tmp + val_step
____return res
def handle_range(val, ranges=(0, 100), res=list()):
____"""处理区间 如 8-20"""
____tmp = val.split('-')
____range1 = int(tmp[0])
____range2 = int(tmp[1])
____tmp_val = range1
____if range1 < 0:
________return res
____while tmp_val <= range2 and tmp_val <= ranges[1]:
________res.append(tmp_val)
________tmp_val = tmp_val + 1
____return res
def handle_rangedv(val, ranges=(0, 100), res=list()):
____"""处理区间/步长 组合 如 8-20/3 """
____tmp = val.split('/')
____range2 = tmp[0].split('-')
____val_start = int(range2[0])
____val_end = int(range2[1])
____val_step = int(tmp[1])
____if (val_step < 1) or (val_start < 0):
________return res
____val_tmp = val_start
____while val_tmp <= val_end and val_tmp <= ranges[1]:
________res.append(val_tmp)
________val_tmp = val_tmp + val_step
____return res
def parse_conf(conf, ranges=(0, 100), res=list()):
____"""解析crontab 五个时间参数中的任意一个"""
____#去除空格,再拆分
____conf = conf.strip(' ').strip(' ')
____conf_list = conf.split(',')
____other_conf = []
____number_conf = []
____for conf_val in conf_list:
________if match_cont(PATTEN['number'], conf_val):
____________#记录拆分后的纯数字参数
____________number_conf.append(conf_val)
________else:
____________#记录拆分后纯数字以外的参数,如通配符 * , 区间 0-8, 及 0-8/3 之类
____________other_conf.append(conf_val)
____if other_conf:
________#处理纯数字外各种参数
________for conf_val in other_conf:
____________for key, ptn in PATTEN.items():
________________if match_cont(ptn, conf_val):
____________________res = PATTEN_HANDLER[key](val=conf_val, ranges=ranges, res=res)
____if number_conf:
________if len(number_conf) > 1 or other_conf:
____________#纯数字多于1,或纯数字与其它参数共存,则数字作为时间列表
____________res = handle_nlist(val=','.join(number_conf), ranges=ranges, res=res)
________else:
____________#只有一个纯数字存在,则数字为时间 间隔
____________res = handle_num(val=number_conf[0], ranges=ranges, res=res)
____return res
def parse_crontab_time(conf_string):
____"""
____解析crontab时间配置参数
____Args:
________conf_string____ 配置内容(共五个值:分 时 日 月 周)
________________________ 取值范围 分钟:0-59 小时:1-23 日期:1-31 月份:1-12 星期:0-6(0表示周日)
____Return:
________crontab_range____list格式,分 时 日 月 周 五个传入参数分别对应的取值范围
____"""
____time_limit____= ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))
____crontab_range = []
____clist________ = []
____conf_length = 5
____tmp_list____ = conf_string.split(' ')
____for val in tmp_list:
________if len(clist) == conf_length:
____________break
________if val:
____________clist.append(val)
____
____if len(clist) != conf_length:
________return -1, 'config error whith [%s]' % conf_string
____cindex = 0
____for conf in clist:
________res_conf = []
________res_conf = parse_conf(conf, ranges=time_limit[cindex], res=res_conf)
________if not res_conf:
____________return -1, 'config error whith [%s]' % conf_string
________crontab_range.append(res_conf)
________cindex = cindex + 1
____return 0, crontab_range
def time_match_crontab(crontab_time, time_struct):
____"""
____将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
____Args:
________crontab_time____crontab配置中的五个时间(分 时 日 月 周)参数对应时间取值范围
________time_struct____ 某个整型时间戳,如:1375027200 对应的 分 时 日 月 周
____Return:
________tuple 状态码, 状态描述
____"""
____cindex = 0
____for val in time_struct:
________if val not in crontab_time[cindex]:
____________return 0, False
________cindex = cindex + 1
____return 0, True
def close_to_cron(crontab_time, time_struct):
____"""coron的指定范围(crontab_time)中 最接近 指定时间 time_struct 的值"""
____close_time = time_struct
____cindex = 0
____for val_struct in time_struct:
________offset_min = val_struct
________val_close = val_struct
________for val_cron in crontab_time[cindex]:
____________offset_tmp = val_struct - val_cron
____________if offset_tmp > 0 and offset_tmp < offset_min:
________________val_close = val_struct
________________offset_min = offset_tmp
________close_time[cindex] = val_close
________cindex = cindex + 1
____return close_time
def cron_time_list(
________cron_time,
________year_num=int(get_str_time(time.time(), "%Y")),
________limit_start=get_str_time(time.time(), "%Y%m%d%H%M"),
________limit_end=get_str_time(time.time() + 86400, "%Y%m%d%H%M")
____):
____#print "\nfrom ", limit_start , ' to ' ,limit_end
____"""
____获取crontab时间配置参数取值范围内的所有时间点 的 时间戳
____Args:
________cron_time 符合crontab配置指定的所有时间点
________year_num____指定在哪一年内 获取
________limit_start 开始时间
____Rturn:
________List________所有时间点组成的列表(年月日时分 组成的时间,如2013年7月29日18时56分:201307291856)
____"""
____#按小时 和 分钟组装
____hour_minute = []
____for minute in cron_time[0]:
________minute = str(minute)
________if len(minute) < 2:
____________minute = '0%s' % minute
________for hour in cron_time[1]:
____________hour = str(hour)
____________if len(hour) < 2:
________________hour = '0%s' % hour
____________hour_minute.append('%s%s' % (hour, minute))
____#按天 和 小时组装
____day_hm = []
____for day in cron_time[2]:
________day = str(day)
________if len(day) < 2:
____________day = '0%s' % day
________for hour_mnt in hour_minute:
____________day_hm.append('%s%s' % (day, hour_mnt))
____#按月 和 天组装
____month_dhm = []
____#只有30天的月份
____month_short = ['02', '04', '06', '09', '11']
____for month in cron_time[3]:
________month = str(month)
________if len(month) < 2:
____________month = '0%s' % month
________for day_hm_s in day_hm:
____________if month == '02':
________________if (((not year_num % 4 ) and (year_num % 100)) or (not year_num % 400)):
____________________#闰年2月份有29天
____________________if int(day_hm_s[:2]) > 29:
________________________continue
________________else:
____________________#其它2月份有28天
____________________if int(day_hm_s[:2]) > 28:
________________________continue
____________if month in month_short:
________________if int(day_hm_s[:2]) > 30:
____________________continue
____________month_dhm.append('%s%s' % (month, day_hm_s))
____#按年 和 月组装
____len_start = len(limit_start)
____len_end = len(limit_end)
____month_dhm_limit = []
____for month_dhm_s in month_dhm:
________time_ymdhm = '%s%s' % (str(year_num), month_dhm_s)
________#开始时间\结束时间以外的排除
________if (int(time_ymdhm[:len_start]) < int(limit_start)) or \
________ (int(time_ymdhm[:len_end]) > int(limit_end)):
____________continue
________month_dhm_limit.append(time_ymdhm)
____if len(cron_time[4]) < 7:
________#按不在每周指定时间的排除
________month_dhm_week = []
________for time_minute in month_dhm_limit:
____________str_time = time.strptime(time_minute, '%Y%m%d%H%M%S')
____________if str_time.tm_wday in cron_time[4]:
________________month_dhm_week.append(time_minute)
________return month_dhm_week
____return month_dhm_limit
#crontab时间参数各种写法 的 正则匹配
PATTEN = {
____#纯数字
____'number':'^[0-9]+$',
____#数字列表,如 1,2,3,6
____'num_list':'^[0-9]+([,][0-9]+)+$',
____#星号 *
____'star':'^\*$',
____#星号/数字 组合,如 */3
____'star_num':'^\*\/[0-9]+$',
____#区间 如 8-20
____'range':'^[0-9]+[\-][0-9]+$',
____#区间/步长 组合 如 8-20/3
____'range_div':'^[0-9]+[\-][0-9]+[\/][0-9]+$'
____#区间/步长 列表 组合,如 8-20/3,21,22,34
____#'range_div_list':'^([0-9]+[\-][0-9]+[\/][0-9]+)([,][0-9]+)+$'
____}
#各正则对应的处理方法
PATTEN_HANDLER = {
____'number':handle_num,
____'num_list':handle_nlist,
____'star':handle_star,
____'star_num':handle_starnum,
____'range':handle_range,
____'range_div':handle_rangedv
}
def main():
____"""测试用实例"""
____#crontab配置中一行时间参数
____conf_string = '*/10 * * * * (cd /opt/pythonpm/devpapps;' \
________________ ' /usr/local/bin/python2.5 data_test.py>>output_error.txt)'
____#时间戳
____time_stamp = int(time.time())
____#解析crontab时间配置参数 分 时 日 月 周 各个取值范围
____res, desc = parse_crontab_time(conf_string)
____if res == 0:
________cron_time = desc
____else:
________print desc
________sys, exit(-1)
____print "\nconfig:", conf_string
____print "\nparse result(range for crontab):"
____
____print " minute:", cron_time[0]
____print " hour: ", cron_time[1]
____print " day: ", cron_time[2]
____print " month: ", cron_time[3]
____print " week day:", cron_time[4]
____#解析 时间戳对应的 分 时 日 月 周
____time_struct = get_struct_time(time_stamp)
____print "\nstruct time(minute hour day month week) for %d :" % \
________ time_stamp, time_struct
____#将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
____match_res = time_match_crontab(cron_time, time_struct)
____print "\nmatching result:", match_res
____#crontab配置设定范围中最近接近时指定间戳的一组时间
____most_close = close_to_cron(cron_time, time_struct)
____print "\nin range of crontab time which is most colse to struct ", most_close
____time_list = cron_time_list(cron_time)
____print "\n\n %d times need to tart-up:\n" % len(time_list)
____print time_list[:10], '...'
if __name__ == '__main__':
____#请看 使用实例
____
____main()
上一篇: Python 多线程threading模
下一篇: Python 中 的 json 模块
47857
46423
37309
34755
29327
25986
24937
19964
19559
18045
5804°
6429°
5944°
5973°
7077°
5924°
5959°
6452°
6415°
7796°