python中的多进程处理

发布时间:2019-08-05 15:33:48编辑:auto阅读(1501)

      众所周知,python本身是单线程的,python中的线程处理是由python解释器分配时间片的;但在python 3.0中吸收了开源模块,开始支持系统原生的进程处理——multiprocessing.

    注意:这个模块的某些函数需要操作系统的支持,例如,multiprocessing.synchronize模块在某些平台上引入时会激发一个ImportError

    1)Process
      要创建一个Process是很简单的。
    1. from multiprocessing import Process
    2. def f(name):
    3.     print('hello', name)
    4. if __name__ == '__main__':
    5.      p = Process(target=f, args=('bob',))
    6.      p.start()
    7.      p.join()
      要获得一个Process的进程ID也是很简单的。
    1. from multiprocessing import Process
    2. import os
    3. def info(title):
    4.     print title
    5.     print 'module name:', __name__
    6.     print 'parent process:', os.getppid()#这个测试不通过,3.0不支持
    7.     print 'process id:', os.getpid()
    8. def f(name):
    9.     info('function f')
    10.     print 'hello', name
    11. if __name__ == '__main__':
    12.     info('main line')
    13.     p = Process(target=f, args=('bob',))
    14.     p.start()
    15.     p.join()
      创建进程:multiprocessing.Process([group[, target[, name[, args[, kargs]]]]])
      参数:
      group:    None,它的存在仅仅是为了与threading.Thread兼容
      target:   一般是函数
      name:     进程名
      args:     函数的参数
      kargs:    keywords参数

      函数:
      run()                  默认的run()函数调用target的函数,你也可以在子类中覆盖该函数
      start()                启动该进程
      join([timeout])        父进程被停止,直到子进程被执行完毕。
                             当timeout为None时没有超时,否则有超时。
                             进程可以被join很多次,但不能join自己
      is_alive()            
      terminate()            结束进程。
                             在Unix上使用的是SIGTERM
                             在Windows平台上使用TerminateProcess

      属性:
      name                   进程名
      daemon                 守护进程
      pid                    进程ID
      exitcode               如果进程还没有结束,该值为None
      authkey                   
               
    2)Queue
      Queue类似于queue.Queue,一般用来进程间交互信息
      例子:
    1. from multiprocessing import Process, Queue
    2. def f(q):
    3.     q.put([42None'hello'])
    4.  if __name__ == '__main__':
    5.      q = Queue()
    6.      p = Process(target=f, args=(q,))
    7.      p.start()
    8.      print(q.get())    # prints "[42, None, 'hello']"
    9.      p.join()
      注意:Queue是进程和线程安全的。
      Queue实现了queue.Queue的大部分方法,但task_done()和join()没有实现。   
      创建Queue:multiprocessing.Queue([maxsize])
      函数:
      qsize()                             返回Queue的大小
      empty()                             返回一个boolean值表示Queue是否为空
      full()                              返回一个boolean值表示Queue是否满
      put(item[, block[, timeout]])      
      put_nowait(item)
      get([block[, timeout]])
      get_nowait()
      get_no_wait()

      close()                             表示该Queue不在加入新的元素
      join_thread()                      
      cancel_join_thread()

    3)JoinableQueue
      创建:multiprocessing.JoinableQueue([maxsize])
      task_done()
      join()

    4)Pipe
    1. from multiprocessing import Process, Pipe
    2. def f(conn):
    3.     conn.send([42None'hello'])
    4.     conn.close()
    5. if __name__ == '__main__':
    6.     parent_conn, child_conn = Pipe()
    7.     p = Process(target=f, args=(child_conn,))
    8.     p.start()
    9.     print(parent_conn.recv())   # prints "[42, None, 'hello']"
    10.     p.join()
      multiprocessing.Pipe([duplex])      返回一个Connection对象

    5)异步化synchronization

    1. from multiprocessing import Process, Lock
    2. def f(l, i):
    3.     l.acquire()
    4.     print('hello world', i)
    5.     l.release()
    6. if __name__ == '__main__':
    7.     lock = Lock()
    8.     for num in range(10):
    9.         Process(target=f, args=(lock, num)).start()
    6)Shared Memory
    1. from multiprocessing import Process, Value, Array
    2. def f(n, a):
    3.     n.value = 3.1415927
    4.     for i in range(len(a)):
    5.         a[i] = -a[i]
    6. if __name__ == '__main__':
    7.     num = Value('d'0.0)
    8.     arr = Array('i', range(10))
    9.     p = Process(target=f, args=(num, arr))
    10.     p.start()
    11.     p.join()
    12.     print(num.value)
    13.     print(arr[:])
    1>Value
    2>Array

    7)Manager
    1. from multiprocessing import Process, Manager
    2. def f(d, l):
    3.     d[1] = '1'
    4.     d['2'] = 2
    5.     d[0.25] = None
    6.     l.reverse()
    7. if __name__ == '__main__':
    8.     manager = Manager()
    9.     d = manager.dict()
    10.     l = manager.list(range(10))
    11.     p = Process(target=f, args=(d, l))
    12.     p.start()
    13.     p.join()
    14.     print(d)
    15.     print(l)

    8)Pool
    1. from multiprocessing import Pool
    2. def f(x):
    3.     return x*x
    4. if __name__ == '__main__':
    5.     pool = Pool(processes=4)              # start 4 worker processes
    6.     result = pool.apply_async(f, [10])     # evaluate "f(10)" asynchronously
    7.     print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    8.     print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"
    multiprocessing.Pool([processes[, initializer[, initargs]]])

    函数:
      apply(func[, args[, kwds]])
      apply_async(func[, args[, kwds[, callback]]])
      map(func,iterable[, chunksize])
      map_async(func,iterable[, chunksize[, callback]])
      imap(func, iterable[, chunksize])
      imap_unordered(func, iterable[, chunksize])
      close()
      terminate()
      join()

    1. from multiprocessing import Pool
    2. def f(x):
    3.     return x*x
    4. if __name__ == '__main__':
    5.     pool = Pool(processes=4)              # start 4 worker processes
    6.     result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously
    7.     print(result.get(timeout=1))          # prints "100" unless your computer is *very* slow
    8.     print(pool.map(f, range(10)))         # prints "[0, 1, 4,..., 81]"
    9.     it = pool.imap(f, range(10))
    10.     print(next(it))                       # prints "0"
    11.     print(next(it))                       # prints "1"
    12.     print(it.next(timeout=1))             # prints "4" unless your computer is *very* slow
    13.     import time
    14.     result = pool.apply_async(time.sleep, (10,))
    15.     print(result.get(timeout=1))          # raises TimeoutError
       

    9)杂项
    multiprocessing.active_children()          返回所有活动子进程的列表
    multiprocessing.cpu_count()                返回CPU数目
    multiprocessing.current_process()          返回当前进程对应的Process对象
    multiprocessing.freeze_support()
    multiprocessing.set_executable()

    10)Connection对象
    send(obj)
    recv()
    fileno()
    close()
    poll([timeout])
    send_bytes(buffer[, offset[, size]])
    recv_bytes([maxlength])
    recv_bytes_info(buffer[, offset]) 
    1. >>> from multiprocessing import Pipe
    2. >>> a, b = Pipe()
    3. >>> a.send([1'hello'None])
    4. >>> b.recv()
    5. [1'hello'None]
    6. >>> b.send_bytes('thank you')
    7. >>> a.recv_bytes()
    8. 'thank you'
    9. >>> import array
    10. >>> arr1 = array.array('i', range(5))
    11. >>> arr2 = array.array('i', [0] * 10)
    12. >>> a.send_bytes(arr1)
    13. >>> count = b.recv_bytes_into(arr2)
    14. >>> assert count == len(arr1) * arr1.itemsize
    15. >>> arr2
    16. array('i', [0123400000])

关键字