Python:线程、进程与协程(3)——

发布时间:2019-09-23 17:04:04编辑:auto阅读(1450)

        Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式。该模块提供了三种队列:

    Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列

    Queue.LifoQueue(maxsize):后进先出,相当于栈

    Queue.PriorityQueue(maxsize):优先级队列。

    其中LifoQueue,PriorityQueue是Queue的子类。三者拥有以下共同的方法:

    qsize():返回近似的队列大小。为什么要加“近似”二字呢?因为当该值大于0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。

    empty():返回布尔值,队列为空时,返回True,反之返回False。

    full():当设定了队列大小的时候,如果队列满了,则返回True,否则返回False。

    put(item[,block[,timeout]]):向队列里添加元素item,block设置为False的时候,如果队列满了则抛出Full异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Full异常。

    put_nowwait(item):等价与put(item,False)。block设置为False的时候,如果队列为空,则抛出Empty异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Empty异常。

    get([block[,timeout]]):从队列中删除元素并返回该元素的值,如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。

    get_nowwait():等价于get(False)

    task_done():发送信号表明入列任务已完成,经常在消费者线程中用到。

    join():阻塞直至队列所有元素处理完毕,然后再处理其它操作。

    (一)源码分析

        Queue模块用起来很简单很简单,但我觉得有必要把该模块的相关源代码贴出来分析下,会学到不少东西,看看大神们写的代码多么美观,多么结构化模块化,再想想自己写的代码,都是泪呀,来学习学习。为了缩减篇幅,源码的注释部分被删减掉。

    from time import time as _time
    try:
        import threading as _threading
    except ImportError:
        import dummy_threading as _threading
    from collections import deque
    import heapq
    
    __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
    
    class Empty(Exception):
        "Exception raised by Queue.get(block=0)/get_nowait()."
        pass
    
    class Full(Exception):
        "Exception raised by Queue.put(block=0)/put_nowait()."
        pass
    
    class Queue:
        def __init__(self, maxsize=0):
            self.maxsize = maxsize
            self._init(maxsize)
            self.mutex = _threading.Lock()
            self.not_empty = _threading.Condition(self.mutex)
            self.not_full = _threading.Condition(self.mutex)
            self.all_tasks_done = _threading.Condition(self.mutex)
            self.unfinished_tasks = 
          
        def get_nowait(self):
            return self.get(False)
        def _init(self, maxsize):
            self.queue = deque()
        def _qsize(self, len=len):
            return len(self.queue)
        def _put(self, item):
            self.queue.append(item)
        def _get(self):
            return self.queue.popleft()

    通过后面的几个函数分析知道,Queue对象是在collections模块的queue基础上(关于collections模块参考 Python:使用Counter进行计数统计及collections模块),加上threading模块互斥锁和条件变量封装的。

    deque是一个双端队列,很适用于队列和栈。上面的Queue对象就是一个先进先出的队列,所以首先_init()函数定义了一个双端队列,然后它的定义了_put()和_get()函数,它们分别是从双端队列右边添加元素、左边删除元素,这就构成了一个先进先出队列,同理很容易想到LifoQueue(后进先出队列)的实现了,保证队列右边添加右边删除就可以。可以贴出源代码看看。

    class LifoQueue(Queue):
        '''Variant of Queue that retrieves most recently added entries first.'''
    
        def _init(self, maxsize):
            self.queue = []
    
        def _qsize(self, len=len):
            return len(self.queue)
    
        def _put(self, item):
            self.queue.append(item)
    
        def _get(self):
            return self.queue.pop()

    虽然它的"queue"没有用queue(),用列表也是一样的,因为列表append()和pop()操作是在最右边添加元素和删除最右边元素。

    再来看看PriorityQueue,他是个优先级队列,这里用到了heapq模块的heappush()和heappop()两个函数。heapq模块对堆这种数据结构进行了模块化,可以建立这种数据结构,同时heapq模块也提供了相应的方法来对堆做操作。其中_init()函数里self.queue=[]可以看作是建立了一个空堆。heappush() 往堆中插入一条新的值 ,heappop() 从堆中弹出最小值 ,这就可以实现优先级(关于heapq模块这里这是简单的介绍)。源代码如下:

    class PriorityQueue(Queue):
        '''Variant of Queue that retrieves open entries in priority order (lowest first).
    
        Entries are typically tuples of the form:  (priority number, data).
        '''
    
        def _init(self, maxsize):
            self.queue = []
    
        def _qsize(self, len=len):
            return len(self.queue)
    
        def _put(self, item, heappush=heapq.heappush):
            heappush(self.queue, item)
    
        def _get(self, heappop=heapq.heappop):
            return heappop(self.queue)

    基本的数据结构分析完了,接着分析其它的部分。

    mutex 是个threading.Lock()对象,是互斥锁;not_empty、 not_full 、all_tasks_done这三个都是threading.Condition()对象,条件变量,而且维护的是同一把锁对象mutex(关于threading模块中Lock对象和Condition对象可参考上篇博文Python:线程、进程与协程(2)——threading模块)。

    其中:

    self.mutex互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。acquire()获取锁,release()释放锁。同时该互斥锁被三个条件变量共同维护。

     self.not_empty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,然后唤醒一个移除元素的线程。

    self.not_full条件变量:当一个元素被移除出队列时,会唤醒一个添加元素的线程。

    self.all_tasks_done条件变量 :在未完成任务的数量被删除至0时,通知所有任务完成

     self.unfinished_tasks  : 定义未完成任务数量


    再来看看主要方法

    (1)put()

    源代码如下:

    def put(self, item, block=True, timeout=None):
            self.not_full.acquire()                  #not_full获得锁
            try:
                if self.maxsize > 0:                 #如果队列长度有限制
                    if not block:                    #如果没阻塞
                        if self._qsize() == self.maxsize:   #如果队列满了抛异常
                            raise Full
                    elif timeout is None:           #有阻塞且超时为空,等待
                        while self._qsize() == self.maxsize:
                            self.not_full.wait()
                    elif timeout < 0:
                        raise ValueError("'timeout' must be a non-negative number")
                    else:        #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间
                        endtime = _time() + timeout
                        while self._qsize() == self.maxsize:
                            remaining = endtime - _time()
                            if remaining <= 0.0:       #到时后,抛异常
                                raise Full
                                #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出
                            self.not_full.wait(remaining)
                self._put(item)                    #调用_put方法,添加元素
                self.unfinished_tasks += 1         #未完成任务+1
                self.not_empty.notify()             #通知非空,唤醒非空挂起的任务
            finally:
                self.not_full.release()            #not_full释放锁


        默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。

        如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。

    (2)get()

    源码如下:

    def get(self, block=True, timeout=None):
            
            self.not_empty.acquire()                #not_empty获得锁
            try:
                if not block:                       #不阻塞时
                    if not self._qsize():           #队列为空时抛异常
                        raise Empty
                elif timeout is None:               #不限时时,队列为空则会等待
                    while not self._qsize():
                        self.not_empty.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = _time() + timeout
                    while not self._qsize():
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Empty
                        self.not_empty.wait(remaining)
                item = self._get()                  #调用_get方法,移除并获得项目
                self.not_full.notify()              #通知非满
                return item                        #返回项目
            finally:
                self.not_empty.release()            #释放锁

        逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。

            不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。

         这是我想了一段时间得出的一种我个人理解的解释,不知道对不对或者说合不合理,如果有大神对这部分知识很熟悉了解,欢迎留言批评指正。


    (3)task_done()

    源码如下:

    def task_done(self):
       
            self.all_tasks_done.acquire()       #获得锁
            try:
                unfinished = self.unfinished_tasks - 1  #判断队列中一个线程的任务是否全部完成
                if unfinished <= 0:                     #是则进行通知,或在过量调用时报异常
                    if unfinished < 0:
                        raise ValueError('task_done() called too many times')
                    self.all_tasks_done.notify_all()
                self.unfinished_tasks = unfinished      #否则未完成任务数量-1
            finally:
                self.all_tasks_done.release()           #最后释放锁

        这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。


    (4)join()

    源码如下:

    def join(self):
    
        self.all_tasks_done.acquire()
        try:
            while self.unfinished_tasks:        #如果有未完成的任务,将调用wait()方法等待
                self.all_tasks_done.wait()
        finally:
            self.all_tasks_done.release()

    阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。


    其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。

    (二)简单例子

    (1)一个简单例子

    实现一个线程不断生成一个随机数到一个队列中

    实现一个线程从上面的队列里面不断的取出奇数

    实现另外一个线程从上面的队列里面不断取出偶数

    import random,threading,time
    from Queue import Queue
    is_product = True
    class Producer(threading.Thread):
        """生产数据"""
        def __init__(self, t_name, queue):
           threading.Thread.__init__(self,name=t_name)
           self.data=queue
        def run(self):
            while 1:
    
                if self.data.full():
                    global is_product
                    is_product = False
                else:
                    if self.data.qsize() <= 7:#队列长度小于等于7时添加元素
                        is_product = True
                        for i in range(2): #每次向队列里添加两个元素
    
                            randomnum=random.randint(1,99)
                            print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                            self.data.put(randomnum,False) #将数据依次存入队列
                            time.sleep(1)
                            print "deque length is %s"%self.data.qsize()
                    else:
                        if is_product:
                            for i in range(2):  #
    
                                randomnum = random.randint(1, 99)
                                print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                                self.data.put(randomnum,False)  # 将数据依次存入队列
                                time.sleep(1)
                                print "deque length is %s" % self.data.qsize()
                        else:
                            pass
    
            print "%s: %s finished!" %(time.ctime(), self.getName())
    
    #Consumer thread
    class Consumer_even(threading.Thread):
        def __init__(self,t_name,queue):
            threading.Thread.__init__(self,name=t_name)
            self.data=queue
        def run(self):
            while 1:
                if self.data.qsize() > 7:#队列长度大于7时开始取元素
                    val_even = self.data.get(False)
                    if val_even%2==0:
                        print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
                        time.sleep(2)
                    else:
                        self.data.put(val_even)
                        time.sleep(2)
                    print "deque length is %s" % self.data.qsize()
                else:
                    pass
    
    
    class Consumer_odd(threading.Thread):
        def __init__(self,t_name,queue):
            threading.Thread.__init__(self, name=t_name)
            self.data=queue
        def run(self):
            while 1:
                if self.data.qsize() > 7:
                    val_odd = self.data.get(False)
                    if val_odd%2!=0:
                        print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
                        time.sleep(2)
                    else:
                        self.data.put(val_odd)
                        time.sleep(2)
                    print "deque length is %s" % self.data.qsize()
                else:
                    pass
    
    #Main thread
    def main():
        queue = Queue(20)
        producer = Producer('Pro.', queue)
        consumer_even = Consumer_even('Con_even.', queue)
        consumer_odd = Consumer_odd('Con_odd.',queue)
        producer.start()
        consumer_even.start()
        consumer_odd.start()
        producer.join()
        consumer_even.join()
        consumer_odd.join()
    
    if __name__ == '__main__':
        main()


    这个例子跟上篇博文Python:线程、进程与协程(2)——threading模块中介绍Condition的例子很像,就是构造了一个长度为20的队列,当队列1元素个数小于8时就忘队列中添加元素,当队列满后,就不再添加,当队列元素大于7个时,才会取元素,否则不取元素。有兴趣的可以动手试试,仔细体会下。


    (2)线程池

            在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,让过来的任务立刻能够使用,就形成了线程池。在python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块。

    #coding=utf-8
    import queue
    import threading
    import contextlib
    import time
     
    StopEvent = object() # 创建空对象
     
    class ThreadPool(object):
     
     def __init__(self, max_num, max_task_num = None):
      if max_task_num:
       self.q = queue.Queue(max_task_num)
      else:
       self.q = queue.Queue()
      self.max_num = max_num
      self.cancel = False
      self.terminal = False
      self.generate_list = []
      self.free_list = []
     
     def run(self, func, args, callback=None):
      """
      线程池执行一个任务
      :param func: 任务函数
      :param args: 任务函数所需参数
      :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
      :return: 如果线程池已经终止,则返回True否则None
      """
      if self.cancel:
       return
      if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
       self.generate_thread()
      w = (func, args, callback,)
      self.q.put(w)
     
     def generate_thread(self):
      """
      创建一个线程
      """
      t = threading.Thread(target=self.call)
      t.start()
     
     def call(self):
      """
      循环去获取任务函数并执行任务函数
      """
      current_thread = threading.currentThread
      self.generate_list.append(current_thread)
     
      event = self.q.get()
      while event != StopEvent:
     
       func, arguments, callback = event
       try:
        result = func(*arguments)
        success = True
       except Exception as e:
        success = False
        result = None
     
       if callback is not None:
        try:
         callback(success, result)
        except Exception as e:
         pass
     
       with self.worker_state(self.free_list, current_thread):
        if self.terminal:
         event = StopEvent
        else:
         event = self.q.get()
      else:
     
       self.generate_list.remove(current_thread)
     
     def close(self):
      """
      执行完所有的任务后,所有线程停止
      """
      self.cancel = True
      full_size = len(self.generate_list)
      while full_size:
       self.q.put(StopEvent)
       full_size -= 1
     
     def terminate(self):
      """
      无论是否还有任务,终止线程
      """
      self.terminal = True
     
      while self.generate_list:
       self.q.put(StopEvent)
     
      self.q.empty()
     
     @contextlib.contextmanager
     def worker_state(self, state_list, worker_thread):
      """
      用于记录线程中正在等待的线程数
      """
      state_list.append(worker_thread)
      try:
       yield
      finally:
       state_list.remove(worker_thread)
     
    # How to use
    pool = ThreadPool(5)
     
    def callback(status, result):
     # status, execute action status
     # result, execute action return value
     pass
     
    def action(i):
     print(i)
     
    for i in range(30):
     ret = pool.run(action, (i,), callback)
     
    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()


关键字