一丶锁
线程安全:
线程安全能够保证多个线程同时执行时程序依旧运行正确, 而且要保证对于共享的数据,可以由多个线程存取,但是同一时刻只能有一个线程进行存取.
import threading v = [] def func(arg): v.append(arg) # 线程安全 print(v) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
1.GIL锁
GIL锁中文名称为"全局解释器锁",主要体现在多线程中,每个线程在执行的过程中都需要先获取GIL,保证同一时刻只有一个线程可以执行代码.而Python语言和GIL没有半毛钱关系,仅仅是由于历史原因在Cpython虚拟机(解释器),难以移除GIL
补充:
(线程释放GIL锁的情况)在IO操作等可能会引起阻塞的system call之前,可以暂时释放GIL,但在执行完毕后,必须重新获取GIL. Python 3.x使用计时器(执行时间达到阈值后,当前线程释放GIL)或Python 2.x,tickets计数达到100,多线程爬取比单线程性能有所提升,因为遇到IO阻塞会自动释放GIL锁.
2.Lock锁(一次放一个)
import threading import time v = [] lock = threading.Lock() def func(arg): lock.acquire() #加锁 # ++++++++++++++++++被锁的功能 v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) #+++++++++++++++++++ lock.release() #解锁 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() # 不加锁: #加锁后 # 2 9 0 0 # 3 9 1 1 # 0 9 2 2 # 1 9 3 3 # 7 9 4 4 # 5 9 5 5
3.RLock锁(一次放一个)
import threading import time v = [] lock = threading.RLock() def func(arg): lock.acquire() lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) lock.release() lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() # 结果: # 0 0 # 1 1 # 2 2 # 3 3 # 4 4 # 5 5
Lock和RLock 的区别:
Lock:Lock(指令锁)是可用的最低级别的同步指令.Lock处于锁定状态时,不被特定的线程拥有.Lock包含两种状态--锁定和非锁定,以及两个基本方法.可以认为Lock有一个锁定值池,当线程请求锁定时,将线程至于池中,知道获得锁定后出池.池中的线程处于状态图中的同步阻塞状态.
RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令.RLock使用了"拥有的线程"和"递归等级"的概念,处于锁定状态时,RLock被某个线程拥有.拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数.可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态
简言之: Lock属于全局,Rlock属于线程
4.BoundedSemaphore(一次放指定个数)
import time import threading lock = threading.BoundedSemaphore(4) #每次允许通过的个数 def func(arg): lock.acquire() #加锁 print(arg) time.sleep(2) lock.release() #解锁 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
5.Condition(一次放多个)
import time import threading lock = threading.Condition() # ############## 方式一:输入几个取出来几个 ############## def func(arg): print('线程进来了') lock.acquire() lock.wait() # 加锁 print(arg) time.sleep(1) lock.release() #解锁 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() while True: inp = int(input('>>>')) lock.acquire() lock.notify(inp) lock.release() #结果: # 线程进来了 # .... # 线程进来了 # >>>3 # >>>0 # 1 # 2 # 2 # >>>3 # 4 # 4 # >>>5 # 8 # 6 # 7 # 3 # >>>9 # ############## 方式二(输入一次放一个)############## # def xxxx(): print('来执行函数了') input(">>>") # ct = threading.current_thread() # 获取当前线程 # ct.getName() return True def func(arg): print('线程进来了') lock.wait_for(xxxx) print(arg) time.sleep(1) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() #结果 # >> > 线程进来了 # 来执行函数了 # >> > 线程进来了 # 来执行函数了 # >> > 1 # 0 # 2 # 1 # 3 # 2 # 6
6.Event(一次放所有)
import threading lock = threading.Event() def func(arg): print('线程来了') lock.wait() # 加锁:红灯 print(arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set() # 绿灯 获取 lock.clear() # 再次变红灯 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set() #绿灯 获取
二丶threading.local
作用:内部自动为每一个线程维护一个空间(字典),用于当前存取属于自己的值.保证线程之间的数据隔离
{
线程ID: {.....}
线程ID: {.....}
线程ID: {.....}
}
import time import threading DATA_DICT = {} def func(arg): ident = threading.get_ident() #获取线程ID DATA_DICT[ident] = arg #{1756: 0, 2636: 1, 8892: 2, 8448: 3, 2344: 4, 7196: 5, 8572: 6, 2268: 7, 2480: 8, 7644: 9} time.sleep(1) print(DATA_DICT[ident],arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 调用对象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone,arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() print(INFO)#{7688: {'phone': 0}, 8972: {'phone': 1}, 5280: {'phone': 2}, 4724: {'phone': 3}, # 8384: {'phone': 4}, 8680: {'phone': 5}, 8220: {'phone': 6}, 9032: {'phone': 7}, 4660: {'phone': 8}, # 528: {'phone': 9}}
三丶线程池 threadpool模块
可以模拟一个场景,假如我们要去领签名照,在工作室签名的明星只有两个在哪儿,而要领签名照的人很多很多,我们可以创建一个大纸箱子,把那些想要领签名照的人的信息记录下来,然后等明星按顺序来签名.这个大纸箱子就是我们所谓的线程池,存放一个个的需求等待CPU来调度
from concurrent.futures import ThreadPoolExecutor import time def task(a1,a2): time.sleep(2) print(a1,a2) # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) for i in range(40): # 去线程池中申请一个线程,让线程执行task函数。 pool.submit(task,i,8)
import time import threading def task(arg): time.sleep(50) while True: num = input('>>>') t = threading.Thread(target=task,args=(num,)) t.start()
import time from concurrent.futures import ThreadPoolExecutor def task(arg): time.sleep(10) print("========") pool = ThreadPoolExecutor(3) while True: num = input('>>>') pool.submit(task,num)
四丶生产者消费者模型
所谓生产者消费者模型就是生产者跟消费者的关系而已,就像厨师跟顾客一样,只有厨师做出来饭,顾客才能吃,如果厨师做不出来饭,顾客想吃也吃不到,只能排队等
import time import queue import threading q = queue.Queue() # 线程安全 def producer(id): """ 生产者 :return: """ while True: time.sleep(2) q.put('包子') print('厨师%s 生产了一个包子' %id ) for i in range(1,4): t = threading.Thread(target=producer,args=(i,)) t.start() def consumer(id): """ 消费者 :return: """ while True: time.sleep(1) v1 = q.get() print('顾客 %s 吃了一个包子' % id) for i in range(1,3): t = threading.Thread(target=consumer,args=(i,)) t.start()