multiprocessing
在2.6才开始使用
multiprocessing 是一个使用方法类似threading模块的进程模块。允许程序员做并行开发。并且可以在UNIX和Windows下运行。
通过创建一个Process 类型并且通过调用call()方法spawn一个进程。
一个比较简单的例子:
#!/usr/bin/env python
from multiprocessing import Process
import time
def f(name):
time.sleep(1)
print 'hello ',name
print os.getppid() #取得父进程ID
print os.getpid() #取得进程ID
process_list = []
if __name__ == '__main__':
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
进程间通信:
有两种主要的方式:Queue、Pipe
1- Queue类几乎就是Queue.Queue的复制,示例:
#!/usr/bin/env python
from multiprocessing import Process,Queue
import time
def f(name):
time.sleep(1)
q.put(['hello'+str(name)])
process_list = []
q = Queue()
if __name__ == '__main__':
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
for i in range(10):
print q.get()
2- Pipe 管道
#!/usr/bin/env python
from multiprocessing import Process,Pipe
import time
import os
def f(conn,name):
time.sleep(1)
conn.send(['hello'+str(name)])
print os.getppid(),'-----------',os.getpid()
process_list = []
parent_conn,child_conn = Pipe()
if __name__ == '__main__':
for i in range(10):
p = Process(target=f,args=(child_conn,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
for p in range(10):
print parent_conn.recv()
Pipe()返回两个连接类,代表两个方向。如果两个进程在管道的两边同时读或同时写,会有可能造成corruption.
进程间同步
multiprocessing contains equivalents of all the synchronization primitives from threading.
例如,可以加一个锁,以使某一时刻只有一个进程print
#!/usr/bin/env python
from multiprocessing import Process,Lock
import time
import os
def f(name):
lock.acquire()
time.sleep(1)
print 'hello--'+str(name)
print os.getppid(),'-----------',os.getpid()
lock.release()
process_list = []
lock = Lock()
if __name__ == '__main__':
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
进程间共享状态 Sharing state between processes
当然尽最大可能防止使用共享状态,但最终有可能会使用到.
1-共享内存
可以通过使用Value或者Array把数据存储在一个共享的内存表中
#!/usr/bin/env python
from multiprocessing import Process,Value,Array
import time
import os
def f(n,a,name):
time.sleep(1)
n.value = name * name
for i in range(len(a)):
a[i] = -i
process_list = []
if __name__ == '__main__':
num = Value('d',0.0)
arr = Array('i',range(10))
for i in range(10):
p = Process(target=f,args=(num,arr,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
print num.value
print arr[:]
输出:
jimin@Jimin:~/projects$ python pp.py
81.0
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
'd'和'i'参数是num和arr用来设置类型,d表示一个双精浮点类型,i表示一个带符号的整型。
更加灵活的共享内存可以使用multiprocessing.sharectypes模块
Server process
Manager()返回一个manager类型,控制一个server process,可以允许其它进程通过代理复制一些python objects
支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array
例如:
#!/usr/bin/env python
from multiprocessing import Process,Manager
import time
import os
def f(d,name):
time.sleep(1)
d[name] = name * name
print d
process_list = []
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
for i in range(10):
p = Process(target=f,args=(d,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
print d
输出结果:
{2: 4}
{2: 4, 3: 9}
{2: 4, 3: 9, 4: 16}
{1: 1, 2: 4, 3: 9, 4: 16}
{1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36}
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 8: 64}
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64}
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
Server process managers比共享内存方法更加的灵活,一个单独的manager可以被同一网络的不同计算机的多个进程共享。
比共享内存更加的缓慢
使用工作池 Using a pool of workers
Pool类代表 a pool of worker processes.
It has methods which allows tasks to be offloaded to the worker processes in a few different ways.