什么是线程
线程,有时被称为轻量进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。由于线程之间的相互制约,致使线程在运行中呈现出间断性。线程也有就绪、阻塞和运行三种基本状态。就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理机;运行状态是指线程占有处理机正在运行;阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行。每一个程序都至少有一个线程,若程序只有一个线程,那就是程序本身。
线程是程序中一个单一的顺序控制流程。进程内有一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位指令运行时的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。
什么是进程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。
线程与进程的关系
线程和进程的关系:线程是属于进程的,线程运行在进程空间内,同一进程所产生的的线程共享同一用户内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。所以,线程不能独立地执行,它必须依附在一个运行的应用程序上(即进程上),而一个进程至少需要一个线程作为它的指令执行,进程管理着资源(比如CPU、内存、文件等等)。而将线程分配到某个CPU上执行。
python中的线程,threading模块
1.建立线程
import threading def func(msg): print(msg) print("这是一个线程") t=threading.Thread(target=func,args=("hello world",)) t.start()
通过继承类的方式创建线程
class Mythread(threading.Thread): def __init__(self,name,age): threading.Thread.__init__(self) self.name=name self.age=age def run(self): #这里是将threading.Thread中的run方法进行了重载 print("%s is %d"%(self.name,self.age)) t=Mythread("sfencs",19) t.start()
2.线程的并发
单个线程的创建基本没有意义,只是与主线程并发,现在我们看一下多个线程的并发
import threading import time class Mythread(threading.Thread): def __init__(self,name,age,second): threading.Thread.__init__(self) self.name=name self.second=second self.age=age def run(self): print(self.name) time.sleep(self.second) print(self.age) t1=Mythread("sfencs",19,2) t2=Mythread("Tom",25,5) t1.start() t2.start()
这里先同时打印sfencs和Tom,过了两秒打印19,又过3秒打印25.这说明这两个线程是并发的,如果是串行的那么会使用7秒完成
我们可以使用time模块计算时间
import threading import time class Mythread(threading.Thread): def __init__(self,name,age,second): threading.Thread.__init__(self) self.name=name self.second=second self.age=age def run(self): print(self.name) time.sleep(self.second) print(self.age) time_begin=time.time() t1=Mythread("sfencs",19,2) t2=Mythread("Tom",25,5) t1.start() t2.start() time_end=time.time() print(time_end-time_begin)'''
sfencs
Tom
0.0010306835174560547
19
25
'''
这里出现一个问题,输出的时间是0.0010306835174560547,而且在年龄之前输出的
原因是计算时间的代码属于主线程,它与两个自己创建的线程并发,所以它提前完成了计算,为了解决这个办法,我们使用join()方法
3.join()
一个线程使用join()方法后,必须等该线程结束后才执行join()之后的代码
time_begin=time.time() t1=Mythread("sfencs",19,2) t2=Mythread("Tom",25,5) t1.start() t2.start() t1.join() t2.join() time_end=time.time() print(time_end-time_begin) ''' sfencs Tom 19 25 5.001618146896362 '''
这样就显然的看出程序并发节约了约2秒钟
除此之外join()方法还有一个参数为阻塞的时间,默认为一直阻塞
4.IO密集型任务和计算密集型任务
IO密集型任务就如上述的例子一样,有阻塞的状态,如sleep()或者等待相关信息,信号时会停用cpu的任务。IO密集型的任务在python中使用多线程能够很好的节约时间完成并发。
计算密集型任务没有等待状态,从上到下执行,没有任何等待
一个线程
import threading import time class Mythread(threading.Thread): def __init__(self,): threading.Thread.__init__(self) def run(self): i=0 while i<100000000: i+=1 time_begin=time.time() t1=Mythread() t1.start() t1.join() time_end=time.time() print(time_end-time_begin)#6.194466590881348
两个线程
import threading import time class Mythread(threading.Thread): def __init__(self,): threading.Thread.__init__(self) def run(self): i=0 while i<100000000: i+=1 time_begin=time.time() t1=Mythread() t2=Mythread() t1.start() t2.start() t1.join() t2.join() time_end=time.time() print(time_end-time_begin)#11.998910427093506
可见计算密集型任务在python中并发并不能很好的节约时间,和串行差不多(在python以前版本中时间还会比串行多)
可是又有一个问题,我们的电脑不是有多核cpu吗,为什么不能同时两个cpu每个运行一个线程,那样时间就只有串行的一半啊?原因就是接下来讲的GIL
5.GIL
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
那么CPython实现中的GIL又是什么呢?GIL全称Global Interpreter Lock为了避免误导,我们还是来看一下官方给出的解释:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
GIL就像是一个防止多线程并发的全局锁,GIL的存在导致多线程无法很好的立即多核CPU的并发处理能力。python的多线程在多核CPU上,只对于IO密集型计算产生正面效果;而当有至少有一个CPU密集型线程存在,那么多线程效率会由于GIL而大幅下降。为了避免GIL的影响,可以使用多进程。
这里具体参考https://www.cnblogs.com/SuKiWX/p/8804974.html
6.守护线程setDaemon
当主线程完成时不需要某个子线程完全运行完就要退出程序,那么就可以将这个子线程设置为守护线程,setDaemon(True).
import threading import time class Mythread(threading.Thread): def __init__(self,name,age,second): threading.Thread.__init__(self) self.name=name self.second=second self.age=age def run(self): print(self.name) time.sleep(self.second) print(self.age) time_begin=time.time() t1=Mythread("sfencs",19,2) t2=Mythread("Tom",25,5) t1.setDaemon(True) t2.setDaemon(True) t1.start() t2.start() time_end=time.time() print(time_end-time_begin) ''' sfencs Tom 0.003045797348022461 '''
不会显示年龄的输出,因为主线程已经结束。
7.Lock锁
多线程中对同一资源进行处理,有可能会导致数据不安全
import time import threading def addNum(): global num #lock.acquire() temp=num time.sleep(0.001) num =temp-1 #lock.release() num = 100 thread_list = [] lock=threading.Lock() for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: t.join() print('final num:', num )#final num: 92
这里运行结果并不是0,原因是多个线程在time.sleep()的时候同时拿到了num,所以num是同一个数,解决方法就是加锁
8.死锁与递归锁
import threading import time mutexA=threading.Lock() mutexB=threading.Lock() class MyThread(threading.Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('%s 拿到A锁' %self.name) mutexB.acquire() print('%s 拿到B锁' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('%s 拿到B锁' %self.name) time.sleep(2) mutexA.acquire() print('%s 拿到A锁' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(5): t=MyThread() t.start()
'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
'''
这里开了5个线程,可是却阻塞住了,原因是在Thread1拿到B锁,Thread2拿到A锁时,func2中在等待获得A锁,func1中在等待获得B锁,两者都在等待对方释放锁,造成了死锁,使得线程互相阻塞解决方法是使用递归锁Rlock
import threading import time lock=threading.RLock() class MyThread(threading.Thread): def run(self): self.func1() self.func2() def func1(self): lock.acquire() print('%s 拿到A锁' %self.name) lock.acquire() print('%s 拿到B锁' %self.name) lock.release() lock.release() def func2(self): lock.acquire() print('%s 拿到B锁' %self.name) time.sleep(2) lock.acquire() print('%s 拿到A锁' %self.name) lock.release() lock.release() if __name__ == '__main__': for i in range(5): t=MyThread() t.start() ''' Thread-1 拿到A锁 Thread-1 拿到B锁 Thread-1 拿到B锁 Thread-1 拿到A锁 Thread-2 拿到A锁 Thread-2 拿到B锁 Thread-2 拿到B锁 Thread-2 拿到A锁 Thread-4 拿到A锁 Thread-4 拿到B锁 Thread-4 拿到B锁 Thread-4 拿到A锁 Thread-3 拿到A锁 Thread-3 拿到B锁 Thread-3 拿到B锁 Thread-3 拿到A锁 Thread-5 拿到A锁 Thread-5 拿到B锁 Thread-5 拿到B锁 Thread-5 拿到A锁 '''
在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源
9.信号量
信号量就相当于一个计数器,控制相同线程最大允许同时并发运行的数量
import threading import time s=threading.Semaphore(3) class Mythread(threading.Thread): def __init__(self,name,age,second): threading.Thread.__init__(self) self.name=name self.second=second self.age=age def run(self): s.acquire() print(self.name) time.sleep(self.second) print(self.age) s.release() t=[] for i in range(6): t.append(Mythread("sfencs",19,2)) time_begin=time.time() for i in range(6): t[i].start() for i in range(6): t[i].join() time_end=time.time() print(time_end-time_begin) ''' sfencs sfencs sfencs 19 sfencs 19 sfencs 19 sfencs 19 1919 4.003796339035034 '''
如果没有信号量来限制,那么程序完成的时间应该为2秒左右
10.条件变量同步
wait等待notify的通知,当接到通知后,会重新从if accquire()执行
import threading import time c=threading.Condition() count=0 class producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global count while True: if c.acquire(): if count>5: c.wait() else: count+=1 print(self.name+":生产了一件商品") c.notify() c.release() time.sleep(0.5) class consumer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global count while True: if c.acquire(): if count<5: print(self.name+":我再等一会儿") c.wait() else: count-=1 print(self.name+":使用了一件商品") c.notify() c.release() time.sleep(0.5) for i in range(2): producer().start() for i in range(5): consumer().start()
11.event
事件(event)用于线程间同步和通信。比如线程A要完成某一任务(event)线程B才能执行后面的代码
set() 开始一个事件
wait() 如果未设置set状态会一直等待,否则过
clear() 清除set状态
isSet() 是否设置set状态
import threading import time class interviewer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): print("我能问你一个问题吗?") event1.set() event2.wait() print("我的问题刚才已经问完了") event2.clear() event1.set() class interviewee1(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): event1.wait() print("你问吧") event1.clear() event2.set() event1.wait() print("行吧。。。") event1.clear() event1=threading.Event() event2=threading.Event() t1 = interviewee1() t2 = interviewer() t1.start() t2.start()
'''
我能问你一个问题吗?
你问吧
我的问题刚才已经问完了
行吧。。。
'''
12.队列queue
说道多线程就不得不提到队列,python中的队列用到了Queue模块,该模块提供了同步的,安全的对序列,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue.这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的通信
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
这个程序将之前的一个用队列改写的
import threading import queue import time c=threading.Condition() count=queue.Queue() class producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global count while True: if c.acquire(): if count.full(): c.wait() else: count.put(1) print(self.name+":生产了一件商品") c.notify() c.release() time.sleep(0.5) class consumer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global count while True: if c.acquire(): if not count.full(): print(self.name+":我再等一会儿") c.wait() else: count.get() print(self.name+":使用了一件商品") c.notify() c.release() time.sleep(0.5) for i in range(2): producer().start() for i in range(5): consumer().start()