python 多进程和协程配合使用

发布时间:2020-02-23 12:35:48编辑:admin阅读(2587)

    一、需求分析

    有一批key已经写入到3个txt文件中,每一个txt文件有30万行记录。
    现在需要读取这些txt文件,判断key是否在数据仓库中。(redis或者mysql)

    为空的记录,需要写入到日志文件中!

     

    任务分工

    1. 使用多进程技术,每一个进程读取一个txt文件

    2. 使用协程技术,批量读取txt文件记录。比如一次性读取 2000条记录

     

    注意:打开文件操作,最好在一个进程中,重复打开文件,会造成系统资源浪费!

     

    二、完整代码

    #!/usr/bin/env python3
    # coding: utf-8
    """
    多线程和协程配合使用示例
    """
    
    import os
    import time
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
    from functools import partial
    from multiprocessing import Process
    
    COROUTINE_NUMBER = 2000  # 协程池数量
    pool = Pool(COROUTINE_NUMBER)  # 使用协程池
    
    # 模拟数据仓库,测试数据
    data_dict = {"1":"x1","3":"x3","5":"x5","7":"x7","9":"x9"}
    
    class TestProgram(object):  # 测试程序
        def __init__(self):
            self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # 项目根目录
    
        def write_log(self,number, content, colour='white', skip=False):
            """
            写入日志文件
            :param content: 写入内容
            :param colour: 颜色
            :param skip: 是否跳过打印时间
            :return:
            """
            # 颜色代码
            colour_dict = {
                'red': 31,  # 红色
                'green': 32,  # 绿色
                'yellow': 33,  # 黄色
                'blue': 34,  # 蓝色
                'purple_red': 35,  # 紫红色
                'bluish_blue': 36,  # 浅蓝色
                'white': 37,  # 白色
            }
            choice = colour_dict.get(colour)  # 选择颜色
    
            path = os.path.join(self.BASE_DIR, "output_%s.log" % number)  # 日志文件
            with open(path, mode='a+', encoding='utf-8') as f:
                if skip is False:  # 不跳过打印时间时
                    content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content
    
                info = "\033[1;{};1m{}\033[0m".format(choice, content)
                print(info)
                f.write(content + "\n")
    
        def has_null(self, key, number):
            """
            输出key
            :param key: 键值
            :param number: 文件标记
            :return: bool
            """
            key = key.strip()
            if not data_dict.get(key):
                self.write_log(number,"错误,{} 记录为空".format(key),"red")
                return False
    
            print(key)
            return True
    
        def read_file(self, number):
            """
            读取文件
            :param number: 文件标记
            :return:
            """
            file_name = os.path.join(self.BASE_DIR, "data", "%s.txt" % number)
            # print(file_name)
            self.write_log(number, "开始读取文件 {}".format(file_name),"green")
            with open(file_name, encoding='utf-8') as f:
                # 使用协程池,执行任务。语法: pool.map(func,iterator)
                # partial使用偏函数传递参数
                # 注意:has_null第一个参数,必须是迭代器遍历的值
                pool.map(partial(self.has_null, number=number), f)
    
            self.write_log(number, "结束文件读取 {} 完成".format(file_name),"green")
            return True
    
        def run(self, number):
            """
            读取指定的文件,判断每一个key是否为空
            :param number:
            :return:
            """
            startime = time.time()  # 开始时间
    
            # 清空日志
            path = os.path.join(self.BASE_DIR, "output_%s.log" % number)  # 日志文件
            with open(path, mode='w') as f:
                pass
    
            self.read_file(number)
    
            endtime = time.time()
            take_time = endtime - startime
    
            if take_time < 1:  # 判断不足1秒时
                take_time = 1  # 设置为1秒
            # 计算花费时间
            m, s = divmod(take_time, 60)
            h, m = divmod(m, 60)
    
            self.write_log(number, "%s.txt 花费时间 %02d:%02d:%02d" % (number,h, m, s),"green")
    
        def main(self):
            """
            使用多线程执行程序
            :return:
            """
            # 文件标记列表
            file_list = ["7001", "7002", "7003"]
    
            p_lst = []  # 线程列表
            for i in file_list:
                # self.run(i)
                p = Process(target=self.run, args=(i,))  # 子进程调用函数
                p.start()  # 启动子进程
                p_lst.append(p)  # 将所有进程写入列表中
    
            for p in p_lst: p.join()  # 检测p是否结束,如果没有结束就阻塞直到结束,否则不阻塞
    
    
    TestProgram().main()  # 启动主程序,它会开启3个进程。

    执行输出:

    1.png


关键字