Python的multiprocessi

发布时间:2019-06-24 15:53:34编辑:auto阅读(1303)


    1.基本介绍

    multiprocessing模块可以衍生出子进程。multiprocessing模块同时提供本地和远程的并发操作。multiprocessing模块不像threading模块那样会受到GIL全局解释器锁的限制,它使用进程代替线程。基于这样的特性,multiprocessing模块可以让程序员在一台服务器上使用多个处理器。

    In [106]: from multiprocessing import Pool
    
    In [107]: def f(x):
       .....:     return x*x;
       .....: 
    
    In [108]: if __name__ == '__main__':
       .....:     p=Pool(5)
       .....:     print(p.map(f,[1,2,3]))
       .....:     
    [1, 4, 9]



    在multiprocessing模块中,子进程是通过一个Process对象生成的。然后调用start()函数

    In [116]: from multiprocessing import Process
    
    In [117]: def f(name):
       .....:     print 'hello',name
       .....:     
    
    In [118]: if __name__ == '__main__':
       .....:     p=Process(target=f,args=('john',))
       .....:     p.start()
       .....:     p.join()
       .....:     
    hello john


    如果要查看各自的进程ID,可以使用以下代码

    #!/usr/sbin/python
    
    from multiprocessing import Process
    import os
    
    def info(title):
        print title
        print 'module name:',__name__
        if hasattr(os,'getppid'):
           print 'parent process:',os.getppid()
        print 'process id:', os.getpid()
    
    def f(name):
        info('function f')
        print 'hello',name
    
    if __name__ == '__main__':
       info('main line')
       p = Process(target=f,args=('john',))
       p.start()
       p.join()


    main line
    module name: __main__
    parent process: 17148
    process id: 18168
    function f
    module name: __main__
    parent process: 18168
    process id: 18169
    hello john


    2.进程间通信

    multiprocessing模块支持Queues和Pipes两种方式来进行进程间通信

    使用Queue

    In [123]: from multiprocessing import Process,Queue
    
    In [124]: def f(q):
       .....:     q.put([42,None,'hello'])
       .....:     
    
    In [125]: if __name__ == '__main__':
       .....:     q=Queue()
       .....:     p=Process(target=f,args=(q,))
       .....:     p.start()
       .....:     print q.get()
       .....:     p.join()
       .....:     
    [42, None, 'hello']


    使用Queues,对于线程和进程来说都是安全的



    使用Pipe

    In [136]: from multiprocessing import Process,Pipe
    
    In [137]: def f(conn):
       .....:     conn.send([42,None,'hello'])
       .....:     conn.close()
       .....:     
    
    In [138]: if __name__ == '__main__':
       .....:     parent_conn,child_conn=Pipe()
       .....:     p=Process(target=f,args=(child_conn,))
       .....:     p.start()
       .....:     print parent_conn.recv()
       .....:     p.join()
       .....:     
    [42, None, 'hello']


    Pipe()返回一对连接对象,这两个连接对象分别代表Pipe的两端。每个连接对象都有send()和recv()方法。需要注意的是如果两个不同的进程在同一时间对同一个Pipe的末端或者连接对象进行读写操作,那么Pipe中的数据可能被损坏。不同的进程在不同的末端同一时间读写数据不会造成数据损坏。


    3.进程间同步

    In [143]: from multiprocessing import Process,Lock
    
    In [144]: def f(l,i):
                   l.acquire()
                   print 'hello world',i
                   l.release()
       .....:     
    
    In [145]: if __name__ == '__main__':
                   lock=Lock()
                   for num in range(10):
                       Process(target=f,args=(lock,num)).start()
       .....:         
    hello world 0
    hello world 1
    hello world 2
    hello world 3
    hello world 4
    hello world 5
    hello world 6
    hello world 7
    hello world 8
    hello world 9



    4.进程间共享状态信息

    在进行并发编程的过程中,尽量不要使用共享状态。如果一定要在进程间共享数据,multiprocessing模块提供了一些方法。

    共享内存

    In [11]: from multiprocessing import Process,Value,Array
    
    In [12]: def f(n,a):
       ....:     n.value=3.1415927
       ....:     for i in range(len(a)):
       ....:         a[i] = -a[i]
       ....:         
    
    In [13]: if __name__ == '__main__':
       ....:     num=Value('d',0.0)
       ....:     arr=Array('i',range(10))
       ....:     p=Process(target=f,args=(num,arr))
       ....:     p.start()
       ....:     p.join()
       ....:     print num.value
       ....:     print arr[:]
       ....:     
    3.1415927
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]




    共享进程

    In [27]: from multiprocessing import Process,Manager
    
    In [28]: def f(d,l):
                d[1] = '1'
                d['2']=2
                d[0.25]=None
                l.reverse()
       ....:     
    
    In [29]: if __name__ == '__main__':
        manager=Manager()
        d=manager.dict()
        l=manager.list(range(10))
        p=Process(target=f,args=(d,l))
        p.start()
        p.join()
        print d
        print l
       ....:     
    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]



    5.使用一组工作进程

    使用Pool对象会创建一组worker进程

    from multiprocessing import Pool, TimeoutErrorimport timeimport osdef f(x):
        return x*xif __name__ == '__main__':
        pool = Pool(processes=4)              # start 4 worker processes
    
        # print "[0, 1, 4,..., 81]"
        print pool.map(f, range(10))
    
        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print i
    
        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print res.get(timeout=1)              # prints "400"
    
        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print res.get(timeout=1)              # prints the PID of that process
    
        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print [res.get(timeout=1) for res in multiple_results]
    
        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print res.get(timeout=1)
        except TimeoutError:
            print "We lacked patience and got a multiprocessing.TimeoutError"



    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    0
    1
    4
    9
    25
    36
    64
    49
    81
    16
    400
    27150
    [27149, 27152, 27151, 27150]
    We lacked patience and got a multiprocessing.TimeoutError



    Pool对象的函数只能是创建它的进程可以使用。

    multiprocessing模块的提供的函数需要子进程可以导入__main__模块









    参考文档:

    https://docs.python.org/2/library/multiprocessing.html


关键字

上一篇: python之抽象一

下一篇: python之万维网