python数据结构和GIL及多进程

发布时间:2019-09-23 17:01:06编辑:auto阅读(1565)

    一 数据结构和GIL

    1 queue

    标准库queue模块,提供FIFO的queue、LIFO的队列,优先队列
    Queue 类是线程安全的,适用于多线程间安全的交换数据,内部使用了Lock和Condition


    为什么说容器的大小不准确,其原因是如果不加锁,是不可能获取到准确的大小的,因为你刚读取了一个大小,还没取走,有可能被就被其他线程修改了,queue类的size虽然加了锁,但是依然不能保证立即get,put就能成功,因为读取大小和get,put方法是分来的。

    2 GIL

    1 简介

    全局解释器锁,进程级别的锁GIL
    Cpython在解释器进程中有一把锁,叫做GIL全局解释器锁。

    GIL 保证Cpython进程中,当前时刻只有一个线程执行代码,甚至在多核情况下,也是如此。

    2 IO 密集型和CPU密集型

    Cpython中
    IO 密集型,由于线程阻塞,就会调度其他线程
    CPU密集型,当前线程可能连续获取GIL,导致其他线程几乎无法使用CPU,若要唤醒其他线程,则需要准备数据,其代价是高昂的。


    IO 密集型,多线程解决,CPU密集型,多进程解决,绕开GIL。

    python中绝大多数内置数据结构的读写操作都是原子操作


    由于GIL 的存在,python的内置数据类型在多线程编程的时候就变得安全了,但是实际上他们本身不是线程安全类型的

    3 保留GIL 原因

    Guido坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全,简单的使用python。
    而移除GIL。会降低Cpython单线程的执行效率。

    4 验证其是否是单线程

    相关实例

    import  logging
    import datetime
    logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
    start=datetime.datetime.now()
    
    def calc():
        sum=0
        for _ in range(1000000000):
            sum+=1
    
    calc()
    calc()
    calc()
    calc()
    calc()
    delta=(datetime.datetime.now()-start).total_seconds()
    logging.info(delta)

    python数据结构和GIL及多进程

    多线程模式下的计算结果

    import  logging
    import datetime
    import threading
    logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
    start=datetime.datetime.now()
    
    def calc():
        sum=0
        for _ in range(1000000000):
            sum+=1
    lst=[]
    for _ in range(5):
        t=threading.Thread(target=calc)
        t.start()
        lst.append(t)
    
    for t in lst:
        t.join()
    
    delta=(datetime.datetime.now()-start).total_seconds()
    
    print (delta)

    结果如下

    python数据结构和GIL及多进程

    从这两个程序来看,Cpython中多线程根本没有优势,和一个线程执行的时间相当,因为存在GIL

    二 多进程

    1 概念

    1 多进程描述

    由于python中的GIL ,多线程不是CPU密集型程序的最好选择

    多进程可以在完全独立的进程中运行程序,可以充分利用多处理器

    但是进程本身的隔离带来数据不共享也是一个问题,且线程比进程轻量的多

    多进程也是解决并发的一种手段

    2 进程和线程的异同

    相同点:

    进程是可以终止的,线程是不能通过命令终止的,线程的终止要么抛出异常,要么程序本身执行完成。

    进程间同步提供了和线程同步一样的类,使用方式也是一样的,使用效果也是类似,不过,进程间同步的代价要高于线程,而且底层实现不同。

    multiprocessing 还提供了共享内存,服务器进程来共享数据,还提供了queue队列,匹配管道用于进程间通信


    不同点

    通信方式不同
    1 多进程就是启用多个解释器进程,进程间通信必须序列化,反序列化
    2 数据的安全性问题

    多进程最好是在main中执行
    多线程已经将数据进行处理了,其不需要再次进行序列化了

    多进程传递必须序列化和反序列化。

    3 进程应用

    远程调用,RPC,跨网络

    2 参数介绍

    multiprocessing中的process类

    process 类遵循了Thread类的API,减少了学习难度
    不同进程可以完全调度到不同的CPU上执行

    IO 密集型最好使用多线程
    CPU 密集型最好使用多进程

    进程提供的相关属性

    名称 含义
    pid 进程ID
    exitcode 进程退出的状态码
    terminate() 终止指定进程

    3 实例

    import  logging
    import datetime
    import multiprocessing
    logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
    start=datetime.datetime.now()
    
    def calc(i):
        sum=0
        for _ in range(1000000000):
            sum+=1
    lst=[]
    for  i in range(5):
        p=multiprocessing.Process(target=calc,args=(i,),name="P-{}".format(i))
        p.start()
        lst.append(p)
    for p in  lst:
        p.join()
    
    delta=(datetime.datetime.now()-start).total_seconds()
    print (delta)

    结果如下

    python数据结构和GIL及多进程

    多进程本身避开了进程和进程之间调度需要的时间,多核心都使用了,此处存在CPU的调度问题
    多进程对CPU的提升是显而易见的。
    单线程,多线程都跑了很长时间,而多进程只是用了1分半,是真正的并行

    4 进程池相关

    import  logging
    import datetime
    import multiprocessing
    logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
    start=datetime.datetime.now()
    
    def calc(i):
        sum=0
        for _ in range(1000000000):
            sum+=1
        print (i,sum)
    if  __name__=='__main__':
        start=datetime.datetime.now()
        p=multiprocessing.Pool(5)  # 此处用于初始化进程池,其池中的资源是可以复用的
        for i in range(5):
            p.apply_async(calc,args=(i,))
        p.close()  # 下面要执行join,上面必须先close
        p.join()
        delta=(datetime.datetime.now()-start).total_seconds()
        print (delta)

    结果如下

    python数据结构和GIL及多进程

    进程创建的多,使用进程池进行处理还是一种比较好的处理方式

    5 多进程和多线程的选择

    1 选择

    1 CPU 密集型
    Cpython 中使用了GIL,多线程的时候互相竞争,且多核优势不能发挥,python使用多进程效率更高

    2 IO密集型

    适合使用多线程,减少IO序列化开销,且在IO等待时,切换到其他线程继续执行,效率不错,当然多进程也适用于IO密集型

    2 应用

    请求/应答模型: WEB应用中常见的处理模型

    master启动多个worker工作进程,一般和CPU数目相同
    worker工作进程中启动多个线程,提高并发处理能力,worker处理用户的请求,往往需要等待数据
    这就是nginx的工作模式

    工作进程一般都和CPU核数相同,CPU的亲原性,进程在CPU的迁移成本比较高。

    三 concurrent包

    1 概念

    concurrent.futures
    3.2 版本引入的模块
    异步并行任务编程模块,提供一个高级的异步可执行的便利接口

    提供了2个池执行器

    ThreadPoolExecutor 异步调用的线程池的Executor
    ProcessPoolExecutor 异步调用进程池的Executor

    2 参数详解

    方法 含义
    ThreadPoolExecutor(max_workers=1) 池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例
    submit(fn,*args,**kwagrs) 提交执行的函数及参数,返回Future实例
    shutdown(wait=True) 清理池

    Future 类

    方法 含义
    result() 可以查看调用的返回结果
    done() 如果调用被成功的取消或者执行完成,则返回为True
    cancelled() 如果调用被成功取消,返回True
    running() 如果正在运行且不能被取消,则返回True
    cancel() 尝试取消调用,如果已经执行且不能取消则返回False,否则返回True
    result(timeout=None) 取返回的结果,超时时为None,一直等待返回,超时设置到期,抛出concurrent.futures.TimeoutError异常
    execption(timeout=None) 取返回的异常,超时为None,一直等待返回,超时设置到期,抛出concurrent.futures.TimeoutError异常

    3 线程池相关实例

    import  logging
    import threading
    from   concurrent  import  futures
    import logging
    import  time
    
    logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")
    
    def worker(n):  # 定义未来执行的任务
        logging.info("begin to work{}".format(n))
        time.sleep(5)
        logging.info("finished{}".format(n))
    # 创建一个线程池,池容量为3
    executor=futures.ThreadPoolExecutor(max_workers=3)
    
    fs=[]
    for i in range(3):
        f=executor.submit(worker,i)  # 传入参数,返回Future对象
        fs.append(f)
    
    for  i in range(3,6):
        f=executor.submit(worker,i)  # 传入参数,返回Future对象
        fs.append(f)
    while True:
        time.sleep(2)
        logging.info(threading.enumerate())  #返回存活线程列表
        flag=True
        for  f  in fs:
            logging.info(f.done()) # 如果被成功调用或取消完成,此处返回为True
            flag=flag  and f.done()  # 若都调用成功,则返回为True,否则则返回为False
        if flag:
            executor.shutdown()  # 如果全部调用成功,则需要清理池
            logging.info(threading.enumerate())
            break
    

    结果如下

    python数据结构和GIL及多进程

    其线程池中的线程是持续使用的,一旦创建好的线程,其不会变化,唯一不好的就是线程名未发生变化,但其最多影响了打印效果

    4 进程池相关实例

    import  logging
    import threading
    from   concurrent  import  futures
    import logging
    import  time
    
    logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")
    
    def worker(n):  # 定义未来执行的任务
        logging.info("begin to work{}".format(n))
        time.sleep(5)
        logging.info("finished{}".format(n))
    # 创建一个进程池,池容量为3
    executor=futures.ProcessPoolExecutor(max_workers=3)
    
    fs=[]
    for i in range(3):
        f=executor.submit(worker,i)  # 传入参数,返回Future对象
        fs.append(f)
    
    for  i in range(3,6):
        f=executor.submit(worker,i)  # 传入参数,返回Future对象
        fs.append(f)
    while True:
        time.sleep(2)
        flag=True
        for  f  in fs:
            logging.info(f.done()) # 如果被成功调用或取消完成,此处返回为True
            flag=flag  and f.done()  # 若都调用成功,则返回为True,否则则返回为False
        if flag:
            executor.shutdown()  # 如果全部调用成功,则需要清理池
            break

    结果如下

    python数据结构和GIL及多进程

    5 支持上下文管理

    concurrent.futures.ProcessPoolExecutor 继承自concurrent.futures.base.Executor,而父类有enter,_exit方法,其是支持上下文管理的,可以使用with语句

    import  logging
    import threading
    from   concurrent  import  futures
    import logging
    import  time
    
    logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")
    
    def worker(n):  # 定义未来执行的任务
        logging.info("begin to work{}".format(n))
        time.sleep(5)
        logging.info("finished{}".format(n))
    fs=[]
    with   futures.ProcessPoolExecutor(max_workers=3) as executor:
        for  i in range(6):
            futures=executor.submit(worker,i)
            fs.append(futures)
    while True:
        time.sleep(2)
        flag=True
        for  f  in fs:
            logging.info(f.done()) # 如果被成功调用或取消完成,此处返回为True
            flag=flag  and f.done()  # 若都调用成功,则返回为True,否则则返回为False
        if flag:
            executor.shutdown()  # 如果全部调用成功,则需要清理池
            break

    结果如下

    python数据结构和GIL及多进程

    6 总结

    统一了线程池,进程池的调用,简化了编程,是python简单的思想哲学的提现
    唯一缺点: 无法设置线程名称

关键字