一. 线程初识
-
什么是线程和进程
进程指的是一个程序执行的过程,是一个资源单位。它包含了操作系统开辟内存空间,将应用程序加载到内存中以及执行应用程序代码的整个过程。就像是一个车间内的一个小工厂一样,整个的生产过程被称之为一个进程。
线程是操作系统真正的执行单元。它仅仅代表的是进程中的最后一步,也就是执行应用程序代码的过程。就像是车间里的一条条流水线一样,是真正的用来执行代码的一个过程。
线程和进程的区别
- 进程占用开销较大,这是因为进程要重新开辟一段内存空间,线程开销较小。
-
进程之间内存是物理隔离的,但是同一进程内的线程之间内存是共享的。注意不同进程之间的线程通信还是需要通过队列进行通信的。
from threading import Thread
import time# 在主线程中设置一个全局变量x x = 100 def task(): global x x = 0 t = Thread(target=task) t.start() t.join() # 此时打印出来的结果是0,说明同一个进程之间线程数据是共享的 print(x)
开启线程的两种方式
-
通过类Thread
因为线程开销较小的原因,我们会发现结果是先打印task里面的内容,然后打印主线程三个字,这个和进程正好相反。
from threading import Thread
import timedef task():
print('thread is running ....')
time.sleep(3)Thread(target=task).start()
print('主线程') -
通过继承类Thread
我们创建一个线程并且执行的过程本质上就是创建内存之后执行Thread类的run函数,因此我们可以通过继承类的方式去创建。
from threading import Thread
import timeclass MyThread(Thread):
def run(self):
print('thread is running ....')
time.sleep(3)MyThread().start()
print('主线程') -
线程相关的属性方法
# current_thread() 当前线程对象,.name是里面的一个属性,可以得到线程名称,主线程名称为MainTread
# active_count() 当前活跃的线程数,记得还有一个主线程
# enumerate 返回一个列表,里面都是当前活跃的线程数目
from threading import Thread, current_thread, active_count, enumerate
import timedef task():
print('%s is running' % current_thread().name)
time.sleep(3)#: name表示自定义线程名称
t = Thread(target=task, name='第一个线程')
t.start()
print(current_thread().name)
print(active_count())
print(enumerate()) 守护进程与守护线程
-
守护进程
对于进程而言,如果代码中有守护进程,也有非守护进程,等主进程代码执行完毕之后守护进程也就结束了,并不会等待非守护进程的执行。
from multiprocessing import Processdef task1(): print(123) time.sleep(1) print('end123') def task2(): print(456) time.sleep(3) print('end456') if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) p1.daemon = True p1.start() p2.start() # 当执行完print之后就代表主进程代码已经执行完毕,此时就会终止守护进程 # 所以不会打印task1里面的内容 # 但是还是要等待非守护进程结束之后主进程才会真正的结束 # 因此我们看到了task2里面的内容 print('主进程over...') # 执行结果; # 主进程over... # 456 # end456
-
守护线程
对于线程而言,如果代码中有守护线程,也有非守护线程,等主线程代码执行完毕之后并不会终止守护线程的执行,只有等到所有的非守护线程执行完毕之后才意味着主线程的结束,此时才会总之守护线程。
将上面的代码的进程改成线程,就会出现不一样的效果。
from threading import Thread
import time
二.锁def task1(): print(123) time.sleep(5) print('end123') def task2(): print(456) time.sleep(3) print('end456') t1 = Thread(target=task1) t2 = Thread(target=task2) t1.daemon = True t1.start() t2.start() # 当print代码执行完毕之后,就代表这主线程代码执行完毕了,但是并不是主线程执行完毕了,这个是和进程的一个区别 # 因此要等待非守护线程t2执行完毕之后才代表主线程真的结束了,此时task1作为守护进程也就被终止了 # 因此我们会看到能够打印全部的task2内容,但是不会打印task1的内容 print('主线程..') # 运行结果 # 123 # 456 # 主线程.. # end456
-
互斥锁
线程中的互斥锁和进程中的互斥锁都是为了解决多个进程或者线程同时访问同一个系统资源时出现数据混乱的问题,不同之处在于所使用模块不一样,因此线程互斥锁只能在线程中使用,进程互斥锁只能在进程中使用。
不使用线程锁的问题
from threading import Thread
import timex = 100
def task():
global x
temp = x
time.sleep(0.1)
x = temp - 1#: 创建100个线程全局变量x进行修改,每次减1
t_l = []
for i in range(100):
t = Thread(target=task)
t_l.append(t)
t.start()# 用来等待所有线程结束
for i in t_l:
i.join()#: 当我们打印数据的时候发现结果是99,为什么呢?
#: 因为每个线程temp = x然后等待i/o,当cpu再次切换过来执行的时候temp=100,x的值就是99
print(x)
因此在一些数据的交互过程中我们需要加上线程锁来保证数据的安全性
from threading import Thread, Lock
import timex = 100
mutex = Lock() # 创建锁def task():
global x
mutex.acquire() # 在数据修改之前添加锁
temp = x
time.sleep(0.1)
x = temp - 1
mutex.release() # 在数据修改之后释放锁#: 创建100个线程全局变量x进行修改,每次减1
t_l = []
for i in range(100):
t = Thread(target=task)
t_l.append(t)
t.start()# 用来等待所有线程结束
print(x)
for i in t_l:
i.join() -
死锁
死锁的现象就是两个线程(或者两个进程)A和B,还有两个互斥锁C和D。A和B都需要C和D这两把锁,但是A抢到了C却没有抢到B,而B抢到了D却没有抢到C,从而导致程序进入死锁状态。
from threading import Thread, Lock, current_threadmutex1 = Lock()
mutex2 = Lock()def task1():
mutex1.acquire()
print('%s 抢到了锁1' % current_thread().name)
# 当抢到锁1之后,cpu就会执行其他的程序
# 当再回来的时候却发现锁2被task2抢到了因此在等待task2释放锁2
# 但是此时task2和task1是一样的,在等待task1释放锁1,此时就进入了死锁的状态
time.sleep(0.1)
mutex2.acquire()
print('%s抢到了锁2' % current_thread().name)mutex2.release() print('%s释放了锁2' % current_thread().name) mutex1.release() print('%s释放了锁1' % current_thread().name)
def task2():
mutex2.acquire()
print('%s 抢到了锁1' % current_thread().name)
time.sleep(0.1)
mutex1.acquire()
print('%s抢到了锁2' % current_thread().name)mutex1.release() print('%s释放了锁2' % current_thread().name) mutex2.release() print('%s释放了锁1' % current_thread().name)
t1 = Thread(target=task1)
t2 = Thread(target=task2)
t1.start()
t2.start()
3.递归锁
递归锁所使用的是RLock函数,其原理是如果我自己需要多把锁的时候,我就把这多把锁设置成一个递归锁,抢到一次递归锁计数就加1,当其他的线程或者进程想使用这一把锁的时候,会首先去查看锁计数是否为0,如果不为零,就等待其他的进程或者线程来释放锁,这就可以解决死锁问题了。
# mutex1 = Lock()
# mutex2 = Lock()
# 只需要将上面的锁修改成递归锁就可以解决上面的死锁问题了
mutex1 = mutex2 = RLock()
4.信号量
信号量使用的是Semaphore类,我们可以给他传递一个值来代表一次可以同时运行几个线程或者是几个进程,类似于一种池的概念,在某种意义上它是不能够保证数据的安全性。只是说在某些没有数据交互的场景下我们可以控制其每次进程或者线程的数量。
from threading import Thread, Semaphore, current_thread
import time
import randoms = Semaphore(5)
x = 30
def go_wc():
global x
s.acquire()
temp = x
time.sleep(random.randint(1, 3))
x = temp - 1
s.release()t_l = []
for i in range(30):
t = Thread(target=go_wc)
t_l.append(t)
t.start()for i in t_l:
print(x)
i.join()
5.GIL全局解释器锁 python程序执行的三个步骤
(1). 开辟一块内存空间,启动Python解释器
(2). 加载python程序到内存中
(3). 将内存中的程序传递给python解释器一步一步执行
这是因为在Python中有一种垃圾回收机制,当一个value的引用计数为0之后,就会被python的垃圾回收机制所清空掉。但是python的垃圾回收机制其实也是通过一个线程来执行的,如果可以同时调用解释器,这就会出现这样一个问题:如果我赋值了一个操作a = [1, 2, 3]的时候,当我这个线程还没有执行这个操作,只是创建了一个值[1, 2, 3]的时候,突然python解释器把垃圾回收机制的线程给执行了,这是垃圾回收机制就会发现这个值[1, 2, 3]当前引用计数还是0呢,就直接清掉了,但是此时我还没有来得及给a赋值呢,这就出现了数据错乱的问题。
# This lock is necessary mainly because CPython’s memory management is not thread-safe.
# 意思是CPython的内存管理机制(垃圾回收机制)不是线程安全的,因此我们不能让python线程同时去调用python解释器。- 什么叫做全局解释器锁
# In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
# native threads from executing Python bytecodes at once.
# GIL全局解释器锁是一种互斥锁,在同一时刻,保证多个线程中只能有一个线程使用python解释器 -
多线程是串行还是并发还是并行
多线程其实也是并发的,串行指的是task1完全执行完毕之后才去执行task2,如下的代码解析。这也是CPython的一大诟病之一,虽然说在I/O密集型的操作中并不会太多的去影响性能,但是相对于多核多线程并发的来说,很显然CPython做的还不是很好。(以目前我的水品来看,觉得如果一个线程内可以有多个python解释器就好了.......哈哈哈)
from threading import Thread, current_threaddef task1(): # 1. 打印当前信息,遇到sleep之后,cpu会执行其他的操作,此时释放GIL锁 # 3. 当释放了GIL锁之后,task2会立马抢到GIL锁,然后cpu执行 print('is running ....', current_thread().name) time.sleep(1) # 6. 打印end信息之后,子线程执行完毕,释放GIL锁 print('ending1....') def task2(): # 4. 抢到GIL锁之后,打印当前信息,遇到sleep之后,cpu去执行其他的操作,此时会释放GIL锁 # 5. 此时task1睡眠完成之后,会立马去抢GIL锁,然后打印end信息 print('is running ....', current_thread().name) time.sleep(1) print('ending2....') t1 = Thread(target=task1) t2 = Thread(target=task2) # 此时会去执行task1函数 # 1. 首先我要去抢GIL锁,当抢到锁之后进入task1函数 t1.start() t2.start()
-
什么时候开多线程和多进程
需要有四个任务去处理
方案一:开启四个进程
方案二:开启四个线程单核: 无论是哪种程序都应该使用方案二,因为单核情况下无论是进程还是线程都需要不停的切换执行,但是在线程的情况下可以减少开启进程的开销以及节省内存空间的使用。 多核: 1. I/O密集型,再多的核也解决不了I/O等待的问题,应该选择方案二 2. 计算密集型,多核意味着并行计算,应该选择方案一
计算密集型性能测试
from multiprocessing import Process
from threading import Thread
import timedef task1(): x = 1 for i in range(60000000): x += i def task2(): x = 1 for i in range(60000000): x += i if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) # p1 = Thread(target=task1) # p2 = Thread(target=task2) start = time.time() p1.start() p2.start() p1.join() p2.join() print(time.time() - start) # 结论 # 对于计算密集型的程序我们应该使用多进程来代替多线程。因为python中的多线程并不能利用多核实现真正的并行 # 使用多进程的结果 # 8.21526575088501 # 使用多线程的结果 # 12.482805252075195
I/O密集型性能测试
from threading import Thread
import timedef task1(): time.sleep(3) def task2(): time.sleep(3) if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) # p1 = Thread(target=task1) # p2 = Thread(target=task2) start = time.time() p1.start() p2.start() p1.join() p2.join() print(time.time() - start) # 结论 # 对于I/O密集型的程序我们应该用多线程去代替多进程 # 使用多进程的结果 # 3.2341346740722656 # 使用多线程的结果 # 3.002285242080688
-
三. 进程池与线程池
-
进程池和线程池
池就是一个容器,进程池就是用来装进程的容器,那么线程池就是用来装线程池的容器,为什么我们需要进程池和线程池呢?因为在大部分的情况下由于计算机硬件条件的限制我们并不能无线的开启进程或者线程,尽管线程的开销很小,因此我们需要用一个容器来限制能够开启的最大进程数和线程数,从而保证我们的计算机可以正常的提供服务。
-
进程池的简单使用
from concurrent.futures import ProcessPoolExecutor
import os
import random
import timedef task1(x):
print('%s is running..' % os.getpid())
time.sleep(random.randint(1,3))if name == 'main':
# 创建一个进程池,进程池的大小可以通过参数进行传递, 如果不指定,默认是cpu的核数
process_pool = ProcessPoolExecutor(4)
# 当执行完submit之后,就会额外的创建出4个进程,用来执行任务
# 函数task1的参数直接在submit中输入就可以传参
process_pool.submit(task1, 1)
time.sleep(30)
在cmd中执行输入命令 【tasklist |findstr pyt】 会发现出现了5个python进程,其中有一个是主线程,另外四个就是进程池内的进程数。
- 线程池的简单使用
线程池的使用和进程池是一样的,只是导入的名称不一样。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread
import os
import random
import time
def task1(x):
print('%s is running..' % x)
time.sleep(random.randint(1,3))
if __name__ == '__main__':
# 创建一个线程池,线程池的大小可以通过参数进行传递, 如果不指定,默认是cpu的核数 * 5
process_pool = ThreadPoolExecutor(4)
for i in range(20):
# 当第一次执行submit之后,就会额外的创建出4个线程等待着执行任务
# 因此当我们执行了代码之后就会看到一下打印出了四行内容,之后就是执行完一个任务再进行下一个任务,最多的线程数是5个
# 其中有一个线程是主线程
process_pool.submit(task1, i)
# time.sleep(2)
四.同步vs异步 阻塞vs非阻塞
- 阻塞和非阻塞
阻塞和非阻塞描述的是程序的一种运行状态
阻塞(阻塞态)
遇到I/0之后,程序在原地等待I/0,并释放cpu资源
非阻塞(就绪态或者运行态):
没有遇到I/0,或者通过某种方式即便是程序遇到I/0也不会停在原地,而是去执行其他的操作,尽可能多的使用cpu的资源。 同步调用和异步调用
同步和异步描述的是程序执行的方式
同步调用
提交完任务之后就在原地进行等待,直到任务执行完毕并拿到了返回值,才会去执行下一行代码。
异步调用
提交完任务之后不会在原地进行等待,直接运行下一行代码,结果可以通过异步回调得到。-
同步调用简单的实现过程
from concurrent.futures import ThreadPoolExecutor
import time
import randomdef task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此处可以返回一个对象,对象有一个result属性可以让我们获得返回值 obj = thread_pool.submit(task, i) # 因为有了result,所以此时的程序就是一个同步调用的方式,我们必须等待任务一个一个完成并且拿到返回值之后才会继续执行循环 print(obj.result())
-
异步调用简单的实现过程
from concurrent.futures import ThreadPoolExecutor
import time
import randomdef task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此处可以返回一个对象,对象有一个result属性可以让我们获得返回值 # 此时是异步调用方式,因为在所有的任务创建的时候,我们并不需要去等待结果,就可以直接去运行下一次循环 obj = thread_pool.submit(task, i) obj_l.append(obj) # 就相当于之前的的join等待线程结束 # 因为现在是一个线程池,所以我们不能单纯的使用join去等待线程结束,我们还需要close将目前的任务先锁定住 # 其实shutdown内部就是实现了先锁定任务,然后等待所有任务执行完毕的过程 thread_pool.shutdown() # 当所有的任务结束之后,我们就可以通过返回的对象列表中随便查看其中的值 print(obj_l[0].result())
五. 异步+回调机制
案例:写一个简单的爬虫案例,来详细的分析一下异步和回调机制
首先,我们先以多进程的方式写一个同步的程序,用来爬取网页上的信息
import requests
import time
import os
import random
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""获取网页的信息"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模拟下载时间
if response.status_code == 200:
return response.text
def parse(data):
"""解析数据"""
time.sleep(0.2) # 模拟解析时间
print('%s 解析长度为%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# 每执行一个任务,都通过result去获得相应的数据,然后通过parse去解析数据
data = pool.submit(get, url).result()
parse(data)
问题:我们发现虽然说能够实现基本的功能,但是太慢了,每次都要等待一个任务全部完成获得返回值之后才会去执行下面的代码,为了提升效率,我们考虑可以把同步的方式转换成异步。因此我们需要将name里面的内容转换成下面的样子。
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
obj_l = []
for url in urls:
obj = pool.submit(get, url)
obj_l.append(obj)
# 等待进程池内的任务全部完成之后才去执行下面的代码
pool.shutdown()
# 此时url的内容都已经下载完成并且保存在对象obj_l列表中,我们通过parse就可以解析了
for obj in obj_l:
parse(obj.result())
问题: 虽然说效率有所提升但是依然存在一些问题
- 解析过程要等待所有的下载任务执行完成之后才能进行解析
- 解析数据是串行的,如果一个解析过程很慢,就会大大的降低整个程序的效率
基于上面的问题,我们可以将解析过程放在get函数里面,在一个任务下载结束之后就会立马的进行解析数据,可以解决问题1,而且对于解析数据而言都是通过下载数据相同的进程进行解析的,可以解决第二个问题。因此我们的代码可以修改成下面这个样子。
import requests
import time
import os
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""获取网页的信息"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模拟下载时间
if response.status_code == 200:
# 返回值我们也不需要,直接去调用parse解析就可以了
parse(response.text)
def parse(data):
"""解析数据"""
time.sleep(0.2)
print('%s 解析长度为%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
pool.submit(get, url)
问题: 这样写的话我们会发现下载内容和解析内容的代码被写在一块了,而且还是被同一个进程执行,这就和我们之前所讲的生产者和消费者模型相悖了,如果我们想让两个函数解耦合,我们可以在get函数中将结果返回回来,然后在主进程中接收,并执行解析函数。此时我们就需要用到回调函数了。
import requests
import time
import os
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""获取网页的信息"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模拟下载时间
if response.status_code == 200:
# 返回值我们也不需要,直接去调用parse解析就可以了
return response.text
def parse(data):
"""解析数据"""
time.sleep(0.2)
print('%s 解析长度为%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# 通过submit将对象obj扔到进程池中
obj = pool.submit(get, url)
# 通过函数add_done_callback函数将传递的函数parse绑定到对象上obj
# 当对象obj所执行的任务一旦完成并获得一个返回值的时候就会自动的去调用parse函数,并将对象当做参数传递进去
# 这种方式和get方法中直接解析数据所实现的效果都是差不多的,只是说解决了耦合的问题,从某种意义上讲,有时候解析的时间还要更长一点
# 注意: parse函数是全部都是通过主进程进行调用的,这也就解释了回调的意思,我来调用你,结果给我之后就应该就由我来解析
obj.add_done_callback(parse)
多线程和多进程的方式都是一样的,唯一不同的地方就在于回调函数是哪个线程空闲哪个线程去执行回调函数
六. 线程queue, Event
- Queue和进程的队列一样,先进先出
# 这种队列是先进先出
q1 = queue.Queue(3)
q1.put(1)
q1.put('2')
q1.put([234])
# q1.put({5: 6}) # 此时会阻塞,等到取出去一个之后才能添加到队列中
print(q1.get())
# 结果1 - LifoQueue 堆栈,先进后出
# 先进后出,堆栈
q1 = queue.LifoQueue(3)
q1.put(1)
q1.put('2')
q1.put([234])
# q1.put({5: 6}) # 此时会阻塞,等到取出去一个之后才能添加到队列中
print(q1.get())
# 结果是[234] - PriorityQueue 优先级队列,数字越小优先级越大
# 优先级队列,传入的参数是一个元组,第一个值为优先级,必须是int,第二个是要压入队列中的值
# 优先级越小越优先,如果优先级相等,比较后面的值,越小越优先
q1 = queue.PriorityQueue(3)
q1.put((1, '123'))
q1.put((1, '456'))
q1.put((2, '789'))
# q1.put((2, {5: 6})) # 此时会阻塞,等到取出去一个之后才能添加到队列中
print(q1.get())
# 结果(1, '123') -
Event 用来进程之间协同工作的
简单的event的使用,用来提前连接服务器判断服务器是否可连
from threading import Thread, Event, current_thread
import time# 创建一个event对象 event = Event() def check(): """首先启动一个线程尝试连接请求""" print('%s 尝试服务器是否可以连接' % current_thread().name) time.sleep(3) # 模拟连接请求持续了秒 event.set() # 当连接请求成功之后设置事件 def connection(): """当check检测通过之后再开始连接""" print('%s 尝试连接' % current_thread().name) event.wait() # 当event.set之后才会执行下面的代码,否则阻塞 print('%s 连接成功' % current_thread().name) t1 = Thread(target=connection) t2 = Thread(target=connection) t3 = Thread(target=connection) t4 = Thread(target=check) t1.start() t2.start() t3.start() t4.start()
可以使用is_set功能模拟尝试三次连接之后断开连接的操作,将connection函数改成下面这个样子
def connection():
"""当check检测通过之后再开始连接"""
# 如果没有设置,也就是说check还没有来连接上服务器,就一直尝试连接
count = 1
while not event.is_set():
if count > 3:
print('您尝试连接的次数过多,请稍后重试')
return
time.sleep(0.8)
print('%s 尝试连接' % current_thread().name)
count += 1
# event.wait() # 当event.set之后才会执行下面的代码,否则阻塞
print('%s 连接成功' % current_thread().name)