进程和线程都会的切换都要消耗时间,保存线程进程当前状态以便下次继续执行。在不怎么需要cpu的程序中,即相对于IO密集型的程序,协程相对于线程进程资源消耗更小,切换更快,更适用于IO密集型。协程也是单线程的,没法利用cpu的多核,想利用cpu多核可以通过,进程+协程的方式,又或者进程+线程+协程。
1、协程的简单实现
协程的原理是通过生成器实现,如下:程序执行到19行,执行consumer函数到13行,next生成器,执行producer函数到8行停下,返回consumer函数13行继续往下执行,循环一次再次来到13行,生成器函数将从上次yield停下的地方往下执行。循环如此就完成就完成了并发的效果。
但有人可能会说我用循环顺序执行两个函数结果也一样啊,如下第2例子。当这里想说的是,这样并不能保留函数的执行的位置,只是简单的一个函数执行结束换到另一个函数而已,遇到需要cpu等待的操作也没法切换。在遇到需要cpu等待的操作主动让出cpu,记住函数执行的位置,下次切换回来继续执行才能算是并发的运行,提高程序的并发效果。
1 import time 2 3 4 def producer(): 5 while True: 6 time.sleep(1) 7 print("+++++ 1个包子", time.strftime("%X")) 8 yield 9 10 11 def consumer(): 12 while True: 13 next(prd) 14 print("----- 1个包子", time.strftime("%X")) 15 16 17 if __name__ == "__main__": 18 prd = producer() 19 consumer() 20 21 22 # 输出结果 23 +++++ 1个包子 16:22:30 24 ----- 1个包子 16:22:30 25 +++++ 1个包子 16:22:31 26 ----- 1个包子 16:22:31 27 +++++ 1个包子 16:22:32 28 ----- 1个包子 16:22:32 29 +++++ 1个包子 16:22:33 30 ----- 1个包子 16:22:33
1 import time 2 3 4 def producer(): 5 time.sleep(1) 6 print("+++++ 1个包子", time.strftime("%X")) 7 8 9 def consumer(): 10 print("----- 1个包子", time.strftime("%X")) 11 12 13 if __name__ == "__main__": 14 while True: 15 producer() 16 consumer() 17 18 19 # 输出结果 20 +++++ 1个包子 16:22:30 21 ----- 1个包子 16:22:30 22 +++++ 1个包子 16:22:31 23 ----- 1个包子 16:22:31 24 +++++ 1个包子 16:22:32 25 ----- 1个包子 16:22:32 26 +++++ 1个包子 16:22:33 27 ----- 1个包子 16:22:33
2、greenlet
greenlet模块需要安装,pip install greenlet。greenlet原理是对生成器的封装。greenlet类提供了一个方法,switch:在需要进行切换的时候切换到指定的协程。
1 from greenlet import greenlet 2 import time 3 4 5 def producer(): 6 while True: 7 time.sleep(1) 8 print("+++++ 1个包子", time.strftime("%X")) 9 gr2.switch() # 切换到gr2运行 10 11 12 13 def consumer(): 14 while True: 15 print("----- 1个包子", time.strftime("%X")) 16 gr1.switch() # 切换到gr1运行 17 18 19 if __name__ == "__main__": 20 gr1 = greenlet(producer) 21 gr2 = greenlet(consumer) 22 gr1.switch() # 切换到gr1运行 23 24 25 # 输出结果 26 +++++ 1个包子 09:39:45 27 ----- 1个包子 09:39:45 28 +++++ 1个包子 09:39:46 29 ----- 1个包子 09:39:46 30 +++++ 1个包子 09:39:47 31 ----- 1个包子 09:39:47
3、gevent
gevent模块也需要安装,pip install gevent。gevent是对gevent的再次封装,能自动识别耗时操作切换到其它协程。注意gevent遇到耗时操作才会切换协程运行,没有遇到耗时操作是不会主动切换的。
gevent.spawn(*args, **kwargs) 不定长参数中的第一个参数为协程执行的方法fn,其余的依次为 fn 的参数。开启了协程后要调用join方法。
gevent模块中识别耗时的操作有两种方式,① 使用gevent模块中重写的类。如,gevent.socket gevent.sleep ② 打补丁的方式,在所有的代码前。from gevent import monkey 导入这个模块,monkey.patch_all()调用这个方法。
推荐使用第二种方式,这样就不用更改已经写好的代码
1 import time 2 import gevent 3 4 5 def producer(): 6 for i in range(3): 7 time.sleep(1) 8 print("+++++ 1个包子", name, time.strftime("%X")) 9 10 11 def consumer(): 12 for i in range(3): 13 time.sleep(1) 14 print("----- 1个包子", name, time.strftime("%X")) 15 16 17 if __name__ == "__main__": 18 g1 = gevent.spawn(producer, "zhangsan") 19 g2 = gevent.spawn(consumer, "lisi") 20 g1.join() 21 g2.join() 22 23 24 # 输出结果 25 +++++ 1个包子 zhangsan 10:42:38 26 +++++ 1个包子 zhangsan 10:42:39 27 +++++ 1个包子 zhangsan 10:42:40 28 ----- 1个包子 lisi 10:42:41 29 ----- 1个包子 lisi 10:42:42 30 ----- 1个包子 lisi 10:42:43
1 import time 2 import gevent 3 4 5 def producer(): 6 for i in range(3): 7 gevent.sleep(1) 8 print("+++++ 1个包子", time.strftime("%X")) 9 10 11 def consumer(): 12 for i in range(3): 13 gevent.sleep(1) 14 print("----- 1个包子", time.strftime("%X")) 15 16 17 if __name__ == "__main__": 18 g1 = gevent.spawn(producer) 19 g2 = gevent.spawn(consumer) 20 g1.join() 21 g2.join() 22 23 24 # 输出结果 25 +++++ 1个包子 10:43:04 26 ----- 1个包子 10:43:04 27 +++++ 1个包子 10:43:05 28 ----- 1个包子 10:43:05 29 +++++ 1个包子 10:43:06 30 ----- 1个包子 10:43:06
1 import time 2 import gevent 3 from gevent import monkey 4 monkey.patch_all() 5 6 7 def producer(): 8 for i in range(3): 9 time.sleep(1) 10 print("+++++ 1个包子", time.strftime("%X")) 11 12 13 def consumer(): 14 for i in range(3): 15 time.sleep(1) 16 print("----- 1个包子", time.strftime("%X")) 17 18 19 if __name__ == "__main__": 20 g1 = gevent.spawn(producer) 21 g2 = gevent.spawn(consumer) 22 g1.join() 23 g2.join() 24 25 26 # 输出结果 27 +++++ 1个包子 10:44:04 28 ----- 1个包子 10:44:04 29 +++++ 1个包子 10:44:05 30 ----- 1个包子 10:44:05 31 +++++ 1个包子 10:44:06 32 ----- 1个包子 10:44:06
当开启的协程很多的时候,一个个的调用join方法就有点麻烦,所以gevent提供了一个方法joinall(),可以一次join所有的协程。joinall() 方法传参一个列表,列表包含了所有的协程。
1 import time 2 import gevent 3 from gevent import monkey 4 monkey.patch_all() 5 6 7 def producer(name): 8 for i in range(3): 9 time.sleep(1) 10 print("+++++ 1个包子", name, time.strftime("%X")) 11 12 13 def consumer(name): 14 for i in range(3): 15 time.sleep(1) 16 print("----- 1个包子", name, time.strftime("%X")) 17 18 19 if __name__ == "__main__": 20 gevent.joinall([gevent.spawn(producer, "zhangsan"), gevent.spawn(consumer, "lisi")]) 21 22 23 # 输出结果 24 +++++ 1个包子 zhangsan 10:51:34 25 ----- 1个包子 lisi 10:51:34 26 +++++ 1个包子 zhangsan 10:51:35 27 ----- 1个包子 lisi 10:51:35 28 +++++ 1个包子 zhangsan 10:51:36 29 ----- 1个包子 lisi 10:51:36
4、协程应用,并发服务器
服务端收到客户端消息,并原样发送回去
1 import socket 2 import gevent 3 from gevent import monkey 4 5 monkey.patch_all() 6 7 8 def fn(conn): 9 msg = conn.recv(1024).decode("utf-8") 10 print("服务的收到>>>", msg) 11 conn.send(msg.encode("utf-8")) 12 13 14 sk = socket.socket() 15 sk.bind(("127.0.0.1", 8899)) 16 sk.listen() 17 while True: 18 conn, addr = sk.accept() 19 print("已连接服务器-->", addr) 20 gevent.spawn(fn, conn) 21 sk.close() 22 23 24 # 输出结果 25 已连接服务器--> ('127.0.0.1', 53878) 26 已连接服务器--> ('127.0.0.1', 53879) 27 已连接服务器--> ('127.0.0.1', 53880) 28 服务的收到>>> client1 29 服务的收到>>> client2 30 服务的收到>>> client3
1 import socket 2 3 sk = socket.socket() 4 sk.connect(("127.0.0.1", 8899)) 5 msg = input("客户端发送的内容>>> ") 6 sk.send(msg.encode("utf-8")) 7 msg = sk.recv(1024).decode("utf-8") 8 print("客户端收到>>>", msg) 9 sk.close() 10 11 12 # 输出结果 13 客户端发送的内容>>> client1 14 客户端收到>>> client1