线程、进程、协程和GIL(三)

发布时间:2019-05-05 21:15:30编辑:auto阅读(2204)

    上一篇文章介绍了:创建线程的两种方式、Event对象判断线程是否启动、利用信号量控制线程并发。

    博客链接:线程、进程、协程和GIL(二)

    这一篇来说说线程间通信的那些事儿: 

      一个线程向另一个线程发送数据最安全的方式就是使用queue库中的队列了,通过创建一个供多个线程共享的Queue对象,这些线程使用put()和get()操作来向队列中添加数据或者从队列中取出数据,以达到线程间通信的效果。

      queue队列基本方法:

        queue.Queue(maxsize = num):  FIFO  先进先出队列,如果maxsize小于或等于0 则代表队列长度无线。

        queue.LifoQueue(maxsize = num): LIFO 后进先出队列(类似于栈),如果maxsize小于或等于0 则代表队列长度无线。

        Queue.qsize(): 返回当前队列中元素的个数

             Queue.empty()   如果队列为空,返回True,反之False 

             Queue.full()   如果队列满了,返回True,反之False

             Queue.get([block[, timeout]])   读队列,timeout等待时间 

             Queue.put(item, [block[, timeout]])   写队列,timeout等待时间 

             Queue.queue.clear()   清空队列

      使用Queue构造生产者消费者模型来实现线程间的通信:

    import time
    from queue import Queue,LifoQueue
    from threading import Thread
    
    def producer(in_q):
        while True:
            time.sleep(1)
            data = '包子'
            if in_q.full() == True:
                print('蒸笼满了,放不下了')
            in_q.put(data)  # 向队列中塞东西
            print('小明蒸%s!' %(data))
    
    def customer(out_q):
        while True:
            time.sleep(3)
            if out_q.empty() == True:
                    print('小红没取到包子,饿死了!')
            data = out_q.get() # 从队列中取东西
            print('小红取 %s ' % (data))
    
    if __name__ == '__main__':
        q = Queue(maxsize=3)
        t1 = Thread(target=producer, args=(q,))
        t2 = Thread(target=customer, args=(q,))
        t1.start()
        t2.start()

      上面的代码实现了一个简单的生产者消费者,小明负责蒸包子,小红负责吃包子。当队列被包子塞满时,小明就再也放不进去了,此时生产者这个线程就会阻塞。当小红将队列中的包子吃完时,消费者这个线程就会阻塞。因为Queue对象已经封装了必要的锁,所以我们可以在多个线程之间安全的功能共享数据。但是在生产者消费者的关闭问题会有一些麻烦,通用的解决方式就是在队列中放置一个特殊值,当消费者读到这个值时,就终止执行。

      不过有个问题需要注意:向队列中添加数据项时,并不会复制此数据项,线程间的通信实际上是在线程间传递对象引用。如果你单线对象的共享状态,那么最好只传递不可修改的数据结构(如:整型、字符串、或者元组)或者一个对象的深拷贝。

      给关键部分加锁

      线程的不安全:同一进程里线程是共享数据的,当各个线程访问同一个数据资源时会出现竞争状态,即数据可能会同时被多个线程占用,造成数据混乱,这就是线程的不安全。

       为了保证线程安全,所以引进了互斥锁,确保某段关键代码、共享数据只能由一个线程从头到尾完整地执行:

      显式的加锁:

    from threading import Thread, Lock
    
    num = 0
    lock = Lock()  # 定义一个锁
    
    def run():
        global num, lock  # 获取全局变量
        lock.acquire()  # 加锁
        num += 1
        print(num)
        lock.release()  # 释放锁
    
    if __name__ == '__main__':
        Thread_list = []
        for i in range(1000):
            t = Thread(target=run)
            Thread_list.append(t)
        for i in Thread_list:
            i.start()

      死锁:但是加入互斥锁之后有可能会产生一个问题:死锁:若干子线程在系统资源竞争时,都在等待对方对某部分资源解除占用状态,结果谁也不愿意先解锁,互相等着,程序无法执行下去,这就是死锁。

      比如:有两个线程一、二,两个共享资源A、B,线程一给资源A加锁,线程二给资源B加锁,然后资源A需要访问资源B,资源B需要调用资源A,线程一二双方都在等待对方释放锁,所以就会造成死锁。

       But、当程序员在加锁之后忘记调用release()方法,或者加锁之后程序抛异常导致不能正常释放锁,有可能会造成死锁,为了避免这种情况,我们不需要显式的手动加锁和释放锁,而是使用with语句来进行自动控制:

      

    from threading import Thread, Lock
    
    num = 0
    lock = Lock()  # 定义一个锁
    
    def run():
        global num, lock
        with lock:  # 自动的控制加锁和释放锁
            num += 1
            print(num)
    
    if __name__ == '__main__':
        Thread_list = []
        for i in range(1000):
            t = Thread(target=run)
            Thread_list.append(t)
        for i in Thread_list:
            i.start()

      创建一个线程池: 

      concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务

    from concurrent.futures import ThreadPoolExecutor
    
    def run():
        print('我是子线程')
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(max_workers=3)  # 创建一个容量为3的线程池
        for i in range(3):
            t = pool.submit(run,)  #在线程池中生成三个线程,他们都来调用run方法
        print('我是主线程')

     

    想了解更多Python关于爬虫、数据分析的内容,欢迎大家关注我的微信公众号:悟道Python

      

关键字