---恢复内容开始---
一、多进程
1、multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
import time from multiprocessing import Process def func(name): print('%s 函数开始,time:%s' %(name,time.ctime())) # time.sleep(2) print('%s 函数结束,time:%s' %(name,time.ctime())) if __name__ == "__main__": p1=Process(target=func,args=('one',)) p2=Process(target=func,args=('two',)) p3=Process(target=func,args=('three',)) p4=Process(target=func,args=('four',)) p5 = Process(target=func, args=('five',)) p1.start() p2.start() p3.start() p4.start() p5.start() print(u'主进程%s'%time.ctime())
结果:
one 函数开始,time:Sat Sep 08 17:15:40 2018 two 函数开始,time:Sat Sep 08 17:15:40 2018 主进程Sat Sep 08 17:15:40 2018 three 函数开始,time:Sat Sep 08 17:15:40 2018 four 函数开始,time:Sat Sep 08 17:15:40 2018 five 函数开始,time:Sat Sep 08 17:15:40 2018 one 函数结束,time:Sat Sep 08 17:15:42 2018 two 函数结束,time:Sat Sep 08 17:15:42 2018 three 函数结束,time:Sat Sep 08 17:15:42 2018 four 函数结束,time:Sat Sep 08 17:15:42 2018 five 函数结束,time:Sat Sep 08 17:15:42 2018
*所有函数并发执行(注意:在windows中Process()必须放到# if __name__ == '__main__':下#方法二(和上面效果一样)
import time from multiprocessing import Process class Piao(Process): def __init__(self,name): Process.__init__(self) self.name=name def run(self): print('%s 函数开始,time:%s' %(self.name,time.ctime())) time.sleep(2) print('%s 函数结束,time:%s' %(self.name,time.ctime())) if __name__ == '__main__': p1=Piao('one') p2=Piao('two') p3=Piao('three') p4=Piao('four')
#p.daemon = True(进程守护和线程守护一样)
p1.start()
p2.start()
p3.start() p4.start() print('主线程%s'%time.ctime())
2、join()方法是用来让主进程等待所有子进程结束并不影响子进程之间的并发:
import time from multiprocessing import Process class Piao(Process): def __init__(self,name): Process.__init__(self) self.name=name def run(self): print('%s 函数开始,time:%s' %(self.name,time.ctime())) time.sleep(2) print('%s 函数结束,time:%s' %(self.name,time.ctime())) if __name__ == '__main__': tasks = [] tasks.append(Piao('one')) tasks.append(Piao('two')) tasks.append(Piao('three')) tasks.append(Piao('four')) for p in tasks: p.start() for p in tasks: p.join() print('主线程%s'%time.ctime())
结果:
one 函数开始,time:Sat Sep 08 17:37:43 2018
two 函数开始,time:Sat Sep 08 17:37:43 2018
three 函数开始,time:Sat Sep 08 17:37:43 2018
four 函数开始,time:Sat Sep 08 17:37:43 2018
one 函数结束,time:Sat Sep 08 17:37:45 2018
two 函数结束,time:Sat Sep 08 17:37:45 2018
three 函数结束,time:Sat Sep 08 17:37:45 2018
four 函数结束,time:Sat Sep 08 17:37:45 2018
主线程Sat Sep 08 17:37:45 2018 #主进程等所有子进程结束,子进程之间并不影响并发
5、进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就 是错乱,如何控制,就是加锁处理
#并发运行,效率高,但竞争同一打印终端,带来了打印错乱 #由并发变成了串行,牺牲了运行效率,但避免了竞争 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s 函数开始,time:%s' %(os.getpid(),time.ctime())) time.sleep(2) print('%s 函数结束,time:%s' %(os.getpid(),time.ctime())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()
24464 函数开始,time:Sat Sep 08 18:02:17 2018 24464 函数结束,time:Sat Sep 08 18:02:19 2018 21072 函数开始,time:Sat Sep 08 18:02:19 2018 21072 函数结束,time:Sat Sep 08 18:02:21 2018 13536 函数开始,time:Sat Sep 08 18:02:21 2018 13536 函数结束,time:Sat Sep 08 18:02:23 2018
之前打印是同一时间,是多个进程共享同一打印终端,但是加了锁后就不能共享了,但是依旧是并发,是锁限制了共享终端,在读写文件是需要枷锁,不然容易造成错乱
6、队列
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,常用来在生产者和消费者线程之间的信息传递。queue.put方法用以插入数据到队列中,queue.get方法用来冲数据队列去除数据(先进先出)
产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get()
if res in None: break time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(20): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
q.put(None) if __name__ == '__main__': q=Queue() #生产者 p1=Process(target=producer,args=(q,)) #消费者 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主进程')
7、进程池
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
from multiprocessing import Pool import os,time def work(n): print('%s run,time:%s' %(os.getpid(),time.ctime())) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res res_l.append(res) #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join() for res in res_l: print res.get(), #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get,‘,’无须换行打印
结果:
6684 run,time:Mon Sep 10 11:41:22 2018 732 run,time:Mon Sep 10 11:41:22 2018 10748 run,time:Mon Sep 10 11:41:22 2018 6684 run,time:Mon Sep 10 11:41:25 2018 732 run,time:Mon Sep 10 11:41:25 2018 10748 run,time:Mon Sep 10 11:41:25 2018 6684 run,time:Mon Sep 10 11:41:28 2018 732 run,time:Mon Sep 10 11:41:28 2018 10748 run,time:Mon Sep 10 11:41:28 2018 6684 run,time:Mon Sep 10 11:41:31 2018 0 1 4 9 16 25 36 49 64 81
#进程池最大容量设置三,因此每次只有三个异步进程执行,等待执行结束再安排
---恢复内容结束---