发布时间:2018-05-17 20:55:01编辑:Run阅读(5210)
线程
什么是线程?
线程是cpu调度的最小单位
进程是资源分配的最小单位
进程和线程是什么关系?
线程是在进程中的 一个执行单位
多进程 本质上开启的这个进程里就有一个线程
多线程 单纯的在当前进程中开启了多个线程
线程和进程的区别:
线程的开启 销毁 任务切换的时间开销小
在同一个进程中数据共享
能实现并发,但不能脱离进程
进程负责管理分配资源 线程负责执行代码
GIL锁 -- 全局解释器锁
同一时刻只能有一个线程访问CPU -- 线程锁
Cpython会受到GIL影响
python程序效率下降
高计算型 -- 多线程会导致程序的效率下降
高IO型的 -- 可以使用多线程
守护线程
守护线程和守护进程的区别
守护进程是等待主进程代码结束之后就结束
守护线程是等待主线程都结束之后才结束
主线程等待其他线程结束,才结束
Thread类的其他方法
Thread实例对象的方法 isAlive(): 返回线程是否活动的。 getName(): 返回线程名。 setName(): 设置线程名。 threading模块提供的一些方法: threading.currentThread(): 返回当前的线程变量。 threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止 后的线程。 threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
代码示例
from threading import Thread import time def func(): # 需要延迟一下,不然会先执行print,那么结果会不一样 time.sleep(0.1) print(123) t = Thread(target=func) t.start() print(t.is_alive()) # 线程是否运行
执行结果
True
123
示例2
from threading import Thread, currentThread, enumerate import time def func(): print('-->', currentThread()) # 打印线程名,线程号 # 需要延迟一下,不然会先执行print,那么结果会不一样 time.sleep(0.1) print(123) t = Thread(target=func) t.start() print(t.is_alive()) # 线程是否运行 print(t.getName()) # 打印线程名 t.setName('t1') # 设置线程名 print(t.getName()) # 打印线程名 print(currentThread()) # 打印线程名,线程号 print(enumerate()) # 打印线程列表
执行结果
-->
-->
True
Thread-1
t1
[
123
同步锁
既然有了GIL锁,为什么还有其他锁?
示例代码:
import time from threading import Thread def func(): global n temp = n time.sleep(0.1) n = temp - 1 n = 100 t_lst = [] for i in range(100): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst: t.join() print(n)
执行结果
99
GIL —— 全局解释器锁
锁线程 :在计算的时候 同一时刻只能有一个线程访问CPU
线程锁限制了你对CPU的使用,但是不影响web类或者爬虫类代码的效率
我们可以通过启动多进程的形式来弥补这个问题
为什么是99呢?
每个线程,从进程中取值100,那么CPU计算100-1,结果是99
这个99又赋值给n,进程变量就是99,所以每次都是赋值操作,赋值了100次,最终结果99,这样还是出现数据不安全的情况
如何解决?加锁
错误 : 示例代码如下
import time from threading import Thread,Lock def func(lock): global n lock.acquire() # 加锁 temp = n time.sleep(0.1) n = temp - 1 lock.release() # 释放锁(解锁) n = 100 t_lst = [] for i in range(100): # 写在for循环里面,等同于创建了100把锁 lock = Lock() # 创建对象锁 t = Thread(target=func,args=(lock,)) t.start() t_lst.append(t) for t in t_lst: t.join() print(n)
执行结果
99
正确代码示例:
import time from threading import Thread,Lock def func(lock): global n lock.acquire() temp = n time.sleep(0.1) n = temp - 1 lock.release() n = 100 t_lst = [] lock = Lock() # 创建一个对象锁 for i in range(100): t = Thread(target=func,args=(lock,)) t.start() t_lst.append(t) for t in t_lst: t.join() print(n)
执行结果
0
正确写法示例2
import time from threading import Thread def func(): global n n = n -1 n = 100 t_lst = [] for i in range(100): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst: t.join() print(n)
执行结果,如果把计算和赋值两个步骤拆开,就会出现数据不安全的情况
0
总结:线程也需要锁,针对上面这张情况,需要加锁,这种锁,叫做同步锁
互斥锁
在同一个线程中,能够被一个锁的多个acquire阻塞住了,这种锁就叫互斥锁
from threading import Lock lock = Lock() # 在同一个线程中 # 能够被一个锁的多个acquire阻塞住了 # 这种锁就叫互斥锁 lock.acquire() lock.acquire()
执行结果
阻塞住了......
死锁
进程也有死锁与递归锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
示例代码:
from threading import Lock import time mutexA = Lock() mutexA.acquire() mutexA.acquire() time.sleep(0.1) print(123) mutexA.acquire() mutexA.acquire()
执行结果
阻塞住......
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
示例代码:
from threading import RLock import time mutexA = RLock() mutexA.acquire() mutexA.acquire() time.sleep(0.1) print(123) mutexA.acquire() mutexA.acquire()
执行结果
123
递归锁 -- RLock
典型问题:科学家吃面
形成死锁原因--即需要同时满足两个必要因素,想要吃面--需要面和叉,造成资源的互相抢占问题
死锁代码示例:
import time from threading import Thread,Lock def eat1(noodle_lock,fork_lock,name): noodle_lock.acquire() print('%s抢到了面'%name) fork_lock.acquire() print('%s抢到了叉子'%name) print('%s正在吃面'%name) fork_lock.release() print('%s归还了叉子' % name) noodle_lock.release() print('%s归还了面' % name) def eat2(noodle_lock,fork_lock,name): fork_lock.acquire() print('%s抢到了叉子' % name) time.sleep(0.5) noodle_lock.acquire() print('%s抢到了面'%name) print('%s正在吃面'%name) noodle_lock.release() print('%s归还了面' % name) fork_lock.release() print('%s归还了叉子' % name) if __name__ == '__main__': noodle_lock = Lock() fork_lock = Lock() Thread(target=eat1,args=(noodle_lock,fork_lock,'zhangsan')).start() Thread(target=eat2,args=(noodle_lock,fork_lock,'lisi')).start() Thread(target=eat1,args=(noodle_lock,fork_lock,'wangwu')).start() Thread(target=eat2,args=(noodle_lock,fork_lock,'zhuliu')).start()
执行结果
只有一个锁,不会出现死锁,那么有多个锁的情况下,就会出现死锁
如何解决这个问题呢?使用递归锁
同一个线程中对同一个锁多次acquire不会产生阻塞
递归锁 -- 错误示例
from threading import Thread,RLock def func(rlock,flag): rlock.acquire() print(flag*10) rlock.acquire() print(flag * 10) rlock.acquire() print(flag * 10) rlock.acquire() print(flag * 10) rlock = RLock() Thread(target=func,args=(rlock,'*')).start() Thread(target=func,args=(rlock,'-')).start()
执行结果
出现死锁的情况
两个线程(进程),一个拿着外层钥匙,一个拿着内层钥匙,谁也不给对方,谁都进不去,就出现了死锁
注释:
第一个线程过来,拿走了一串钥匙
每次acquire,就进入一个房间,最后在房间的最里面,因为没有release,所以出不来,阻塞在里面
如图
它每走出一个房间,需要release一次,将钥匙放到最外面门上,让下个进程进去,所以有几次acquire,就有几次release,跟函数的递归类似,怎么解决上面卡住的问题?加上4次release就可以了,如下
from threading import Thread,RLock def func(rlock,flag): rlock.acquire() print(flag*10) rlock.acquire() print(flag * 10) rlock.acquire() print(flag * 10) rlock.acquire() print(flag * 10) rlock.release() rlock.release() rlock.release() rlock.release() rlock = RLock() Thread(target=func,args=(rlock,'*')).start() Thread(target=func,args=(rlock,'-')).start()
执行结果
**********
**********
**********
**********
----------
----------
----------
----------
使用递归锁,解决科学家吃面的问题,代码如下
import time from threading import Thread,RLock def eat1(noodle_lock,fork_lock,name): noodle_lock.acquire() print('%s抢到了面'%name) fork_lock.acquire() print('%s抢到了叉子'%name) print('%s正在吃面'%name) fork_lock.release() print('%s归还了叉子' % name) noodle_lock.release() print('%s归还了面' % name) def eat2(noodle_lock,fork_lock,name): fork_lock.acquire() print('%s抢到了叉子' % name) time.sleep(0.5) noodle_lock.acquire() print('%s抢到了面'%name) print('%s正在吃面'%name) noodle_lock.release() print('%s归还了面' % name) fork_lock.release() print('%s归还了叉子' % name) if __name__ == '__main__': fork_lock = noodle_lock = RLock() # 表示同一串钥匙 Thread(target=eat1,args=(noodle_lock,fork_lock,'zhangsan')).start() Thread(target=eat2,args=(noodle_lock,fork_lock,'lisi')).start() Thread(target=eat1,args=(noodle_lock,fork_lock,'wangwu')).start() Thread(target=eat2,args=(noodle_lock,fork_lock,'zhuliu')).start()
执行结果
什么情况下,需要用到递归锁呢?
有超过一个资源需要锁的时候 -- 递归锁
信号量
同进程一样
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有2个线程可以获得semaphore,即可以限制最大连接数为2):
from threading import Thread,Semaphore,current_thread import time def func(): sm.acquire() print('{} get sm'.format(current_thread().getName())) time.sleep(3) sm.release() if __name__ == '__main__': sm = Semaphore(2) for i in range(5): t = Thread(target=func) t.start()
执行结果
Thread-1 get sm
Thread-2 get sm
Thread-3 get sm
Thread-4 get sm
Thread-5 get sm
与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
事件
同进程一样
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作
实例:
from threading import Thread,Event,current_thread import time import random def conn_mysql(event): count = 1 while count < 4: print('第{}次尝试连接'.format(current_thread().getName(),count)) event.wait(0.5) # 如果不传参数会一直等到事件为True为止 # 如果传参数,传一个时间参数,等到多少秒,类似time.sleep() count += 1 if event.is_set(): print('连接成功'.format(current_thread().getName())) break else: print('连接失败') def check_mysql(event): print('{}正在检查mysql'.format(current_thread().getName())) time.sleep(random.randint(1,2)) event.set() if __name__ == '__main__': event = Event() conn1 = Thread(target=conn_mysql,args=(event,)) check = Thread(target=check_mysql,args=(event,)) conn1.start() check.start()
执行结果
Thread-1第1次尝试连接
Thread-2正在检查mysql
Thread-1第2次尝试连接
Thread-1第3次尝试连接
Thread-1连接成功
条件
使得线程等待,只有满足某条件时,才释放n个线程
详细说明
Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire 和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则 wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重 新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
代码说明:
import threading def run(n): con.acquire() con.wait() print("run the thread: {}".format(n)) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start() while True: inp = input('>>>').strip() if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() print('******')
执行结果
定时器
定时器,指定n秒后执行某个操作
from threading import Timer def hello(): print('hello, world') t = Timer(2, hello) # 2秒后执行hello t.start()
执行结果
hello, world
线程队列
queue队列 :使用import queue,用法与进程Queue一样
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
class queue.Queue(maxsize=0) #先进先出
import queue q = queue.Queue() # 先进先出,先打印1 q.put('1') q.put('2') q.put('3') print(q.get()) print(q.get()) print(q.get())
执行结果
1
2
3
class queue.LifoQueue(maxsize=0) #后进先出
import queue q = queue.LifoQueue() # 后进先出,先打印3 q.put('1') q.put('2') q.put('3') print(q.get()) print(q.get()) print(q.get())
执行结果
3
2
1
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
优先级队列
import queue q = queue.PriorityQueue() # 优先级队列,数字越小优先级越高 # 字符串,则按照asicc码顺序排列 q.put((1,'a')) q.put((10,'b')) q.put((5,'c')) print(q.get()) print(q.get()) print(q.get())
执行结果
(1, 'a')
(5, 'c')
(10, 'b')
Python标准模块--concurrent.futures
线程池
能够在多线程的基础上进一步节省内存和时间开销
#1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务 #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 #shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 #result(timeout=None) 取得结果 #add_done_callback(fn) 回调函数
示例代码
import time from concurrent.futures import ThreadPoolExecutor def func(i): print(i*'*') time.sleep(1) thread_pool = ThreadPoolExecutor(5) # ThreadPoolExecutor(5)设置线程数为5 for i in range(10): thread_pool.submit(func, i) # submit 相当于apply_async 异步 thread_pool.shutdown() # shutdown 相当于 close+join print('1111') # 如果想最后打印 1111,需要执行shutdown()
执行结果
*
**
***
****
*****
******
*******
********
*********
1111
上面代码 return一个值 异步代码示例:
import time from concurrent.futures import ThreadPoolExecutor def func(i): print(i*'*') time.sleep(1) return i ** 2 thread_pool = ThreadPoolExecutor(5) # 设置线程池数量为5 ret_lst = [] for i in range(10): ret = thread_pool.submit(func, i) # submit 相当于apply_async 异步 ret_lst.append(ret) thread_pool.shutdown() # shutdown 相当于 close + join for ret in ret_lst: print(ret.result()) # result 返回值 print('111111')
执行结果
map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('{} is runing'.format(os.getpid())) time.sleep(random.randint(1, 3)) return n**2 if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=3) executor.map(task, range(1,12)) # map取代了for + submit
执行结果
回调函数
import time from concurrent.futures import ThreadPoolExecutor def func(i): print(i*'*') time.sleep(1) return i**2 def callb(arg): print(arg.result()*'-') if __name__ == '__main__': thread_pool = ThreadPoolExecutor(5) for i in range(10): # 相当于apply_async add_done_callback 回调函数 thread_pool.submit(func,i).add_done_callback(callb) #将callb函数的返回值作为参数 thread_pool.shutdown() # 相当于 close+join print('wahaha')
执行结果
当内存不需要共享,且高计算的时候 用进程
当内存需要共享,且高IO的时候 用线程
当并发很大的时候
设计模式
多进程 : 多个任务 —— 进程池 :cpu个数、cpu个数+1
多线程 :多个任务 —— 线程池 :cpu个数*5
4核 : 4个、5个进程 —— 20条线程/进程 : 80-100个任务
47743
46233
37107
34625
29227
25882
24743
19861
19414
17906
5713°
6312°
5832°
5885°
6981°
5827°
5842°
6358°
6313°
7670°