Python3 多线程

发布时间:2019-10-12 20:08:19编辑:auto阅读(2046)

    两个概念:

    • 并发:假同时,一段时间内同时处理多个任务,单核都可以;
    • 并行:真同时,同时处理多个任务,必须多核。

    主流操作系统上完成并发的手段有进程和线程,主流的编程语言提供了用户空间的调度:协程。Python 也不例外。

    (想自学习编程的小伙伴请搜索圈T社区,更多行业相关资讯更有行业相关免费视频教程。完全免费哦!)

    由于现在的操作系统上的进程越来越轻量,导致进程和线程之间的区别越来越少。事实上,Linux 并没有原生的线程,线程是通过进程实现的。

    python 中每一个进程会启动一个解释器,而线程会共享一个解释器。

    Python 中的线程是通过标准库 threading 实现的。而启动一个线程就是让这个线程执行一些逻辑,这些逻辑就对应一个函数。

    >>> import threading
    >>> def worker(): # 让多个线程来执行它
    ...     print('work')
    ...
    >>> thread = threading.Thread(target=worker) # 创建了一个线程对象,target 参数是一个函数,即线程要执行的逻辑
    >>> thread.start() # start 启动一个线程,执行完毕后,自动退出,Python 没有提供主动退出线程的方法
    work
    

    由于 python 没有提供退出线程的方法,因此我们一定不能在逻辑中定义死循环,不然线程无法退出。当然直接 kill -9 和刻意为之的另说。而像那种监听某个端口提供服务的进程,为了保证不退出,通常都会有一个 while True 的死循环。

    上面只是启动了一个线程,很显然没什么屌用。启动多个线程的方式非常简单,就是在它的外面套一个 for 循环就可以了:

    import time
    import threading
    
    def worker(num):
        time.sleep(1)
        print('work-{}'.format(num))
    
    for i in range(5):
        t = threading.Thread(taret=worker, args=(i, )) # 启动了五个线程,要启动几个就循环几次
        t.start()
    

    通过 args 给函数传递参数,也可以使用 kwargs 通过字典传递。结果是在等待一秒之后,所有线程同时输出了,并且在一个线程的换行符还没有打印出来的时候,下一个线程就输出了,这就涉及到线程安全的问题了。很显然,print 并不是线程安全的。

    线程相比于进程更轻量,上下文切换的代价没有进程那么大,但即使如此,线程数量也不宜过多。

    标识一个线程

    threading.current_thread() 可以返回当前的线程对象。

    >>> threading.Thread(target=lambda: print(threading.current_thread())).start()
    <Thread(Thread-13, started 140007299499776)>
    

    返回的线程对象我们可以通过一个变量进行接收:

    thread = threading.current_thread()
    

    它有很多属性和方法:

    • name:返回线程的名字;
    • ident:返回该线程的唯一标识符;
    • is_alive:告知该线程是否存活;
    • enumerate:可以通过循环它打印出所有的线程;

    我们创建线程对象的时候是可以给它取名字的:

    t = threading.Thread(target=worker, name='thread1')
    

    这个 name 可以通过 logging 的 threadName 获得。

    logging

    前面提到过,print 并不是线程安全的,而 logging 模块线程安全。

    >>> import logging
    >>> logging.warning('hehe')
    WARNING:root:hehe
    >>> logging.info('hehe') # 默认只输出 warning 以上级别
    

    我们可以对其进行一些基础的配置,让其记录 DEBUG 以上的级别,以及记录线程名:

    >>> logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    >>> logging.info('hehe')
    2017-09-23 15:41:36,868 INFO MainThread hehe
    

    知道了它的简单用法之后,我们就可以使用多线程了:

    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(lineno)d %(message)s')
    
    def worker():
        logging.info(logging.info('work'))
    
    for i in range(5):
        t = threading.Thread(target=worker)
        t.start()
    

    使用 logging 就没有问题了,因此我们通常使用它来替代 print。

    logging 还可以将异常的栈追踪信息记录下来,这在排查错误的时候非常方便:

    import logging
    
    try:
        config['DE']['xxx']
    except Exception as e:
        logging.exception(e)
    print('xxx')
    

    daemon 与 non-daemon

    daemon 在 linux 上是守护进程的意思,它始终在后台运行。而在 Python 中的 daemon 线程会在主线程退出之后退出。也就是说,如果不是 daemon 线程,主线程退出之后,非 daemon 线程还会继续执行,直到结束退出。

    线程默认不是 daemon,如果想要设置为 daemon,那就在创建线程对象的时候,给它传递 daemon=True 即可。

    >>> t = threading.Thread(target=worker, daemon=True)
    >>> t.daemon
    Out[20]: True
    

    通过下面的例子证明之前的说法:

    import time
    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    def worker():
        logging.info('start')
        time.sleep(2)
        logging.info('end')
    
    if __name__ == '__main__':
        logging.info('start')
        t1 = threading.Thread(target=worker, name='non-daemon')
        t1.start()
        t2 = threading.Thread(target=worker, name='daemon', daemon=True)
        t2.start()
        logging.info('end')
    
    # 执行结果
    2017-09-24 04:08:49,027 INFO MainThread start
    2017-09-24 04:08:49,028 INFO non-daemon start
    2017-09-24 04:08:49,028 INFO daemon start
    2017-09-24 04:08:49,028 INFO MainThread end
    2017-09-24 04:08:51,031 INFO non-daemon end
    

    执行上面的代码你会发现有的时候主线程退出了,但是 daemon 线程还会执行完成。这是因为虽然从日志中看到主线程退出,但是事实上主线程是没有退出的,它会等待非 daemon 线程执行完毕后才会退出,这样就给了 daemon 线程的执行时间了。当我们将 t1 给注释掉之后,就不可能出现主线程退出后,daemon 线程仍然执行的情况了。

    如果我们在 t2.start() 之后增加一行 t2.join(),那即使它是 daemon 线程,主线程依然会等待它执行完毕后再退出。因为 join 会阻塞直到线程执行完毕。join 支持一个参数,那就是阻塞的秒数。t2.join(1) 表示只阻塞一秒,这个时候即使 t2 没有执行完成,主线程依然会退出。join 用的比较多,它并不占用 CPU 时间。

    创建线程的另一种方法

    上面创建线程的方法是通过实例化 Thread,我们还可以通过下面这种方式:

    import logging
    import threading
    
    class Mythread(threading.Thread):
        def run(self):
            logging.warning('worker')
    
    t = Mythread()
    t.start()
    

    通过继承 + 重写 run 方法来到达启动多线程的效果,run 等同于之前 target 指定的函数。但是 Python 中这种方法使用的很少。

    当我们创建一个线程对象的时候,除了可以使用 start 启动它之外,还可以通过 run 来启动。如果不是以继承的方式创建线程,一个线程对象的 run 和 start 只能执行其中一个。

    thread local

    定义一个 thread local 对象。

    ctx = threading.local()
    

    这时的 ctx 没有任何属性,我们可以给它增加属性:

    >>> ctx.data = 5
    >>> ctx.data
    Out[25]: 5
    

    继续:

    >>> data = 'abc' # 定义一个变量
    >>> def worker():
    ...     logging.warning(data)
    ...     logging.warning(ctx.data)
    ...
    >>> worker() # 执行没什么问题
    WARNING:root:abc
    WARNING:root:5
    >>> threading.Thread(target=worker).start() # 但是通过线程执行就不行了
    WARNING:root:abc # data 可以直接打出来
    Exception in thread Thread-9:
    Traceback (most recent call last):
      File "/usr/local/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
        self.run()
      File "/usr/local/python3/lib/python3.6/threading.py", line 864, in run
        self._target(*self._args, **self._kwargs)
      File "<ipython-input-32-2e99199c517b>", line 3, in worker
        logging.warning(ctx.data)
    AttributeError: '_thread._local' object has no attribute 'data' # 但是 ctx.data 提示没有
    

    这是因为 ctx.data 是一个 thread local 的变量,我们可以给它赋值任意属性,但是只对当前线程可见。线程独享!

    使用 run 方法,它会将 target 放在主线程中;start 则会将其放到子线程中,二者只能执行一个。

    定时器

    也可以称为延时执行。Python 中存在一种特殊的线程,可用于延迟执行。它继承自 Thread 类,因此它也是 Thread 对象。

    >>> def worker():
    ...     logging.warning('worker')
    ...
    >>> t = threading.Timer(interval=5, function=worker)
    >>> t.start()
    
    
    • interval:延时多少秒执行,默认为 30;
    • function:等同于 target。

    可以看到执行 start 方法后,五秒后才有输出。在等待的过程中,它可以通过 cancel() 终止。

    它也可以设置线程名,只不过要这样:

    >>> t = threading.Timer(interval=5, function=worker)
    >>> t.name = 'Timer'
    >>> t.deamon = True # 设置是否为 daemon
    

    当 function 指定的函数开始执行的时候,无法通过 cancel() 终止。

    Timer 的定时执行功能很弱,如果真的有这方面的需要,可以使用 APSchedule。

    event

    第一种线程同步的方式。同步意味着阻塞,如果线程之间没有联系,完全没有必要使用同步。有这么一种需求:worker 线程做一些事情,当它完成之后,通知 boss 线程,由 boss 完成处理后续工作。这可能并不难实现,但是 boss 线程要统计 worker 线程的执行时间呢?

    这就要用到线程间通信的机制了,最简单的是 event:

    >>> event = threading.Event()
    >>> event.set()
    >>> event.wait()
    Out[8]: True
    

    它是一个 threading.Event 的对象,有 set 和 wait 这两个方法。wait 会阻塞线程直到 set 方法被调用。
    有了这两种方法之后,我们就可以完成上面的需求了:

    import time
    import random
    import logging
    import datetime
    import threading
    
    def worker(event: threading.Event):
        time.sleep(random.randint(1, 5))
        event.set()
    
    def boss(event: threading.Event):
        start = datetime.datetime.now()
        event.wait()
        logging.warning('worker exit {}'.format(datetime.datetime.now() - start))
    
    def start():
        event = threading.Event()
        b = threading.Thread(target=boss, args=(event,), name='boss')
        b.start()
        for x in range(5):
            threading.Thread(target=worker, args=(event,), name='worker').start()
    
    start()
    

    五个 worker 线程,谁先执行完成就谁执行 event.set(),一旦 event.set 被执行,boss 线程也就会继续执行并输出日志了。但是会有一个问题,由于是随机 sleep 时间,也就是说最快 boss 线程可以一秒就退出,但是还有四个 worker 线程还在执行,这四个线程拉长了整个脚本的执行时间。

    再做修改:

    import time
    import random
    import logging
    import datetime
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    def worker(event: threading.Event):
        s = random.randint(1, 5)
        event.wait(s) # 先阻塞
        event.set() # 一下全放开了
        logging.info('sleep {}'.format(s))
    
    def boss(event: threading.Event):
        start = datetime.datetime.now()
        event.wait()
        logging.info('worker exit {}'.format(datetime.datetime.now() - start))
    
    def start():
        event = threading.Event()
        threading.Thread(target=boss, args=(event,), name='boss').start()
        for x in range(5):
            threading.Thread(target=worker, args=(event,), name='worker-{}'.format(x)).start()
    
    start()
    
    # 执行结果
    2017-09-25 06:15:42,114 INFO worker-0 sleep 2
    2017-09-25 06:15:42,115 INFO boss worker exit 0:00:02.004014
    2017-09-25 06:15:42,116 INFO worker-1 sleep 5
    2017-09-25 06:15:42,116 INFO worker-2 sleep 4
    2017-09-25 06:15:42,116 INFO worker-3 sleep 3
    2017-09-25 06:15:42,117 INFO worker-4 sleep 2
    

    可以看到都在同一秒退出了,这是因为 wait 可以指定超时时间,时间一到它就不再阻塞。这样阻塞时间最短的那个线程就会执行 set,这样一来所有阻塞的线程同时放开了,于是同一时间都执行完成了。因此,wait 会阻塞线程直到 set 方法被调用,或者超时时间到。

    event 可以被多个线程所持有,多个线程可以同时被阻塞,一旦其中一个线程执行了 set,那么所有的线程都不再阻塞。event 可以在线程之间发送信号,通常用于某个线程需要其他线程处理某些动作之后才能启动。

    event 还有一个特性,如果先 set 然后 wait,不管有没有指定超时,它都瞬间返回 True(因为阻塞被放开,所以无法再阻塞);而如果直接 wait,且给它一个超时时间,那么超时完成之后,它会返回 False。我们可以根据这个特点来完成定时的操作。

    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    def worker(event: threading.Event):
        while not event.wait(3):
            logging.info('running')
    
    event = threading.Event()
    threading.Thread(target=worker, args=(event,)).start()
    

    每三秒会输出一次日志,会无限输出下去。但是如果执行 event.set() 就会终止死循环。

    event 还有一些方法:

    • is_set:用来判断有没有 set 过;
    • clean:清除 set 标志,通常用来做线程退出的条件。

      def worker(event):

      while not event.is_set():
          pass
      

    wait 会主动让出 CPU 时间片,time.sleep 却不会。假如它们分到了 10ms 的 CPU 时间,都使用了 5ms,那么剩余的 5ms wait 会让给别人,而 sleep 会自己用完。因此我们会使用 wait 而不是 sleep。

    实现定时器

    延时执行。

    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    class Timer:
        def __init__(self, interval, function, *args, **kwargs):
            self.interval = interval
            self.function = function
            self.args = args
            self.kwargs = kwargs
            self.event = threading.Event()
            self.thread = threading.Thread(target=self.__thread)
    
        def __thread(self):
            if not self.event.wait(self.interval):
                self.function(*self.args, **self.kwargs)
    
        def start(self):
            self.thread.start()
    
        def cancel(self):
            self.event.set()
    
    def worker():
        logging.info('running')
    
    t = Timer(interval=2, function=worker)
    t.start()
    

    Lock

    第二种线程同步的方式。lock 用来保护共享资源,其余几种线程同步的方式都是用了它。

    import random
    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    class Counter:
        def __init__(self):
            self.__val = 0
    
        @property
        def value(self):
            return self.__val
    
        def inc(self):
            self.__val += 1
    
        def dec(self):
            self.__val -= 1
    
    counter = Counter()
    
    def fn():
        if random.choice([-1, 1]) > 0:
            logging.info('inc')
            counter.inc()
        else:
            logging.info('dec')
            counter.dec()
    
    for x in range(10):
        threading.Thread(target=fn).start()
    
    print(counter.value)
    

    上面的代码即使你知道它加了多少次减了多少次,但你不能肯定它的结果,这是因为资源的争用。Lock 对象可用于解决这种问题:

    >>> lock = threading.Lock()
    >>> lock.acquire()
    Out[4]: True
    

    对于 lock 实例,只能调用一次 acquire 方法,再次调用会被阻塞,直到 release 方法被调用。根据它的这种特性,可用来改造之前的 Counter。

    class Counter:
        def __init__(self):
            self.__val = 0
            self._lock = threading.Lock()
    
        @property
        def value(self):
            return self.__val
    
        def inc(self):
            self._lock.acquire()
            self.__val += 1
            self._lock.release()
    
        def dec(self):
            self._lock.acquire()
            self.__val -= 1
            self._lock.release()
    

    这样一来,不管有多少个线程,同一时间只会有一个线程能够修改 __val。但是这样会有一个问题,如果执行加减的时候发生了异常(虽然这里不会),那么 release 永远就不会执行,那么就会形成死锁,因此我们要使用 try finally。

      def inc(self):
            try:
                self._lock.acquire()
                self.__val += 1
            finally:
                self._lock.release()
    

    从上面这种结构中我们可以联想到 with,事实上它是支持 with 的,因此我们可以定义的更为简单:

    def inc(self):

    with self._lock:
        self.__val += 1
    

    凡是用锁的地方,一定要在 finally 中使用 release,否则就会有锁死的可能性。

    而对于读来说,如果不加锁,就会存在脏读的可能性,就看能不能忍受了。通过加锁之后,Counter 类就变成线程安全了,我们可以放心的使用。

    锁是并发的难点,它会将并发变为串行,掌握了锁,并发就没有丝毫问题了。那么何时需要加锁?凡是有共享资源的地方都要加锁。

    lock 对象可以接收两个参数:

    • blocking:当再次加锁时,如果它为 False,那么不会阻塞,而是返回 False;
    • timeout:如果 blocking 为 True,timeout 大于等于 0 会阻塞到超时,并返回 False。

    预先启动 10 个线程处理一些任务,当其中一个线程在处理其中一个任务时,其他线程可以处理其他任务,这时候就可以用到非阻塞锁。第一个线程对该任务加非阻塞锁,由于之前没有加过锁,因此可以加上。第二个线程再加的时候就加不上了,并且返回 False,这时就可以让它跳过这个任务去执行下一个任务了。

    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    def worker(tasks):
        for task in tasks:
            # 第一个执行加锁的线程可以锁,它的值为 True。由于锁住了,剩下的九个线程执行的时候它的值都为 False
            # 因此 loggi.info 语句只会执行 10 次
            if task.lock.acquire(False):
                logging.info(task.name)
    
    class Task:
        def __init__(self, name):
            self.name = name
            self.lock = threading.Lock()
    
    tasks = [Task(x) for x in range(10)]
    
    for i in range(5):
        threading.Thread(target=worker, args=(tasks,), name='work-{}'.format(i)).start()
    

    如果任务有先后顺序的话,就只能串行了。

    RLock

    可重入锁在同一个线程内可多次加锁,但是只能有一个线程成功,并且 acquire 几次,就需要 release 几次。

    >>> rlock = threading.RLock()
    >>> rlock.acquire()
    Out[13]: True
    >>> rlock.acquire()
    Out[14]: True
    >>> rlock.release()
    >>> rlock.release()
    

    condition

    第三种线程同步的方式。通常用于生产者消费者模式,生产者生产消息之后,使用 notify 和 notify_all 通知消费者进行消费。而消费者使用 wait 方法阻塞等待生产者的通知。

    import random
    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    class Dispatcher:
        def __init__(self):
            self.data = None
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def consumer(self):
            while not self.event.wait(1):
                with self.cond:
                    self.cond.wait() # 会阻塞,直到 notifyAll 被执行
                    logging.info(self.data)
    
        def producer(self):
            for _ in range(10):
                data = random.randint(0, 100)
                logging.info(data)
                self.data = data
                with self.cond:
                    self.cond.notify_all()
                self.event.wait(1)
            self.event.set()
    
    d = Dispatcher()
    p = threading.Thread(target=d.producer, name='producer')
    c = threading.Thread(target=d.consumer, name='consumer')
    p.start()
    c.start()
    

    有生产者修改共享资源,然后通知消费者进行消费。

    • wait:会阻塞,直到被 notify 唤醒;
    • notifyAll:老版的驼峰写法,现已改为下面的,但为了兼容仍然存在;
    • notify_all:用于通知所有 wait 的线程,可以理解为广播;
    • notify:接收一个数字,表示唤醒多少个 wait 线程,默认为 1。可以理解为单播。

    比如下面的示例中,虽然启动了四个消费者进程,但是只允许两个同时消费,至于是哪两个就不得而知了。

    import random
    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    class Dispatcher:
        def __init__(self):
            self.data = None
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def consumer(self):
            while not self.event.is_set():
                with self.cond:
                    self.cond.wait()
                    logging.info(self.data)
    
        def producer(self):
            for _ in range(10):
                data = random.randint(0, 100)
                logging.info(data)
                self.data = data
                with self.cond:
                    self.cond.notify(2)
                self.event.wait(1)
            self.event.set()
    
    d = Dispatcher()
    p = threading.Thread(target=d.producer, name='producer')
    for i in range(4):
        threading.Thread(target=d.consumer, name='consumer-{}'.format(i)).start()
    p.start()
    

    按理来说,因为有锁的存在,所以只有在消费者的 with 代码块执行完毕,锁释放之后,生产者才能进入自己的 with 代码块。这样就能够保证,消费者只有在消费完毕之后生产者才能继续生产。但是我在运行过程中生产者根本不会等待消费者消费,它自己一个劲的跑。

    无论 notify、notify_all 还是 wait,都必须先 acquire,完成之后必须确保 release,因此通常使用 with 语法。

    barrier

    第四种线程同步的方式,栅栏的意思,只有凑齐一拨人之后才往下走。从下面这段代码中就能理解它的作用:

    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    def worker(barrier: threading.Barrier):
        logging.info('waiting for {} threads'.format(barrier.n_waiting))
        try:
            # 上面的代码各个线程什么时候执行,怎么执行都无所谓
            # 但是所有线程都会在这里同时等待,只有所有线程都执行到这了,才同时执行下面的代码
            worker_id = barrier.wait()
        except threading.BrokenBarrierError:
            logging.warning('aborting')
        else:
            logging.info('after barrier {}'.format(worker_id))
    
    # 实例化的时候指定拦多少个线程,如果启动了四个线程,只要三个到齐了就可以同时往下走了
    barrier = threading.Barrier(3)
    for i in range(3):
        threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start()
    logging.info('start')
    

    barrier 对象的一些属性和方法:

    • wait:阻塞线程,它可以指定超时时间,超时时间一到抛出 BrokenBarrierError 异常。如果执行过 abort 方法,那么再执行 wait 也会抛出 BrokenBarrierError 异常;
    • reset:清除对象执行 abort 的痕迹。执行 abort 后执行 rest,接着执行 wait 就不会抛异常了;
    • n_waiting:当前有多少个线程在等待;
    • abort:通知已经在等待的线程不必再等了,不能因为它一个而让其他线程在那傻等。而一旦执行了这个方法, wait 就会抛出 BrokenBarrierError 异常,因此不处于 wait 状态的线程是不会抛出这个异常的。

    适用场景:比如有十种工作,每个线程负责一种,只有这十个线程都初始化完成后才能工作。

    semaphore

    最后五种线程同步的方式。信号量和锁很像,锁是为 1 的信号量。

    # 创建一个为 3 的信号量
    >>> s = threading.Semaphore(3)
    >>> s.acquire()
    Out[84]: True
    >>> s.acquire(False)
    Out[85]: True
    >>> s.acquire(False)
    Out[86]: True
    >>> s.acquire(False)
    Out[87]: False
    

    它可以锁多次,上面锁了三次都没有问题,等到第四次的时候就不行了。由于锁只能锁一次,所以它是为 1 的信号量。RLock 也能锁多次,它是它只能用在同一个线程上,信号量却可以在多个线程中使用。

    创建一个连接池的时候可以用到它:

    import time
    import random
    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    class Pool:
        def __init__(self, num):
            self.num = num # 指定池子的连接数
            self.conns = [self._make_connect(x) for x in range(num)]
            self.s = threading.Semaphore(num)
    
        # 这个函数是拿到连接之后做的操作
        def _make_connect(self, name):
            return name
    
        # 从池子中取出一个连接
        def get(self):
            self.s.acquire()
            return self.conns.pop()
    
        def return_resource(self, conn):
            # 执行完毕后,将连接放回池子中
            self.conns.insert(0, conn)
            self.s.release()
    
    def worker(pool):
        logging.info('started')
        name = pool.get()
        logging.info('get connect {}'.format(name))
        time.sleep(random.randint(1, 3))
        pool.return_resource(name)
        logging.info('return resource {}'.format(name))
    
    pool = Pool(3)
    for i in range(5):
    threading.Thread(target=worker, args=(pool,), name='worker-{}'.format(i)).start()

    如果不使用信号量的话,我们还需要对池子是否为空进行判断。为什么将连接放回连接池中的 insert 操作不需要加锁呢?这是因为 GIL 的影响。

    信号量也是对资源的保护,但是和锁不一样的地方在于,锁限制只有一个线程可以访问共享资源,而信号量限制指定个线程可以访问共享资源。事实上我们只需要使用信号量就可以了,因为锁本身就是信号量的一种。

    queue

    队列,它是进程间通信的一种方式,队列有三种:

    • FIFO:Queue.Queue(maxsize=0),先进先出,线程安全;
    • LIFO:Queue.LifoQueue(maxsize=0),后进先出;
    • Priority:Queue.PriorityQueue(maxsize=0),优先队列。

    创建一个先进先出队列:

    >>> import queue
    >>> q = queue.Queue() # 队列长度无限
    

    对象的属性和方法:

    • empty():判断队列是否为空(不可靠)。因为等你获取队列的长度时,可能已经有人往里面放入了数据;
    • full():队列是否满了(不可靠);
    • maxsize:查看队列的最大长度;
    • qsize():看到队列当前长度(不可靠);
    • clear():清空队列;
    • join():等到队列为空的时候,才进行操作;
    • put():往队列里面添加内容,可以为任意数据结构。put(self, item, block=True, timeout=None),block 表示是否为队列是否为阻塞状态。队列满了,再往里面加内容,队列会阻塞。如果不阻塞会返回一个异常,默认为阻塞状态;timeout 是阻塞的时间,如果队列满了,再往队列里面添加数据时,timeout 时间后会抛出异常。如果为 None(默认),它会一直阻塞,直到有线程从队列中取出数据;
    • get():从队列中取内容。如果是先进先出队列,它会取出最先存进去的数据。get(self, block=True, timeout=None),如果队列是空的,并且 timeout 为 None,它会一直阻塞,直到有线程往队列里面存入数据;
    • put_nowait(item):等效于 put(item, block=False);
    • get_nowait():等效 get(item, block=False)。

    我们可以通过它来重写生产者消费者模型:

    #!/usr/local/python3/bin/python3
    
    import queue
    import random
    import logging
    import threading
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
    
    def producer(queue: queue.Queue, event: threading.Event):
        while not event.wait(3):
            data = random.randint(0, 100)
            logging.info(data)
            queue.put(data)
    
    def consumer(queue: queue.Queue, event: threading.Event):
        while not event.is_set():
            logging.info(queue.get())
    
    q = queue.Queue()
    e = threading.Event()
    threading.Thread(target=consumer, args=(q, e), name='consumer').start()
    threading.Thread(target=producer, args=(q, e), name='producer').start()
    

    通过 e.set() 就能停止它。相对 condition 实现的生产者消费者模型,它的优势在于可以暂存数据,这在生产者和消费者速率不一致的时候很好用;而它的缺陷在于无法广播,无法通知多个线程同时消费一条消息。因为我们通常可以将它们结合起来使用。

    取出队列中所有数据:

    while not q.enpty():
        q.get()
    

    GIL

    全局解释器锁,这是 Python 争议很大的一个点。正是由于它的存在,在操作内置容器时,解释器会在解释器级别增加一个锁,因此 Python 所有内置容器(字典、列表等)都是线程安全的,多线程环境下使用没有丝毫问题。而导致的后果就是 Python 的并发性能很差。

    Python 中 collection, logging 等标准库都是线程安全的。

    concurrent.futures

    官网地址,Python3.2 引入的异步模块。
    创建一个线程池:

    from concurrent import futures
    pool = futures.ThreadPoolExecutor(max_workers=5)
    

    pool 对象有三个方法。submit 用于执行一个函数:

    >>> fut = pool.submit(lambda: 1+1) # 执行一段逻辑,也就是一个函数
    >>> fut.result() # 获取执行结果
    Out[116]: 2
    >>> fut.done() # 查看函数是否执行完成
    Out[117]: True
    >>> fut.running() # 是否处于运行状态
    Out[118]: False
    >>> fut.cancel() # 一个已经开始运行的线程是无法结束的,没开始的(比如 pool 满了在阻塞)可以
    Out[119]: False
    >>> fut.exception() # 如果函数中产生了异常,可以通过它来获取异常的实例
    

    传递参数:

    pool.submit(self.create_vm, vm_attributes, extra_attributes, conns)

    通过这种方式使用线程,不需要将数据发送到队列中。

    进程池由 ProcessPoolExecutor 实现,它们简化了进程和线程的操作,并且对返回值和异常进行了处理。

    建议使用 futures,虽然它无法设置线程名(3.6 之后可以)、daemon 等属性,但是问题不大。

关键字