第十五章 Python多进程与多线程

发布时间:2019-09-12 07:54:32编辑:auto阅读(1354)

    15.1 multiprocessing

    multiprocessing是多进程模块,多进程提供了任务并发性,能充分利用多核处理器。避免了GIL(全局解释锁)对资源的影响。

    有以下常用类:

    描述

    Process(group=None, target=None, name=None, args=(), kwargs={})派生一个进程对象,然后调用start()方法启动

    Pool(processes=None, initializer=None, initargs=())

    返回一个进程池对象,processes进程池进程数量
    Pipe(duplex=True)返回两个连接对象由管道连接
    Queue(maxsize=0)返回队列对象,操作方法跟Queue.Queue一样
    multiprocessing.dummy这个库是用于实现多线程

    Process()类有以下些方法:

    run()
    start()启动进程对象
    join([timeout])等待子进程终止,才返回结果。可选超时。
    name进程名字
    is_alive()返回进程是否存活
    daemon进程的守护标记,一个布尔值
    pid返回进程ID
    exitcode子进程退出状态码
    terminate()终止进程。在unix上使用SIGTERM信号,在windows上使用TerminateProcess()。

    Pool()类有以下些方法:

    apply(func, args=(), kwds={})等效内建函数apply()
    apply_async(func, args=(), kwds={}, callback=None)异步,等效内建函数apply()
    map(func, iterable, chunksize=None)等效内建函数map()
    map_async(func, iterable, chunksize=None, callback=None)异步,等效内建函数map()
    imap(func, iterable, chunksize=1)等效内建函数itertools.imap()
    imap_unordered(func, iterable, chunksize=1)像imap()方法,但结果顺序是任意的
    close()关闭进程池
    terminate()终止工作进程,垃圾收集连接池对象
    join()等待工作进程退出。必须先调用close()或terminate()

    Pool.apply_async()和Pool.map_aysnc()又提供了以下几个方法:

    get([timeout])获取结果对象里的结果。如果超时没有,则抛出TimeoutError异常
    wait([timeout])等待可用的结果或超时
    ready()返回调用是否已经完成
    successful()

    举例:

    1)简单的例子,用子进程处理函数

    from multiprocessing import Process
    import os
    def worker(name):
        print name
        print 'parent process id:', os.getppid()
        print 'process id:', os.getpid()
    if __name__ == '__main__':
        p = Process(target=worker, args=('function worker.',))
        p.start()
        p.join()
        print p.name
        
    # python test.py
    function worker.
    parent process id: 9079
    process id: 9080
    Process-1

    Process实例传入worker函数作为派生进程执行的任务,用start()方法启动这个实例。

    2)加以说明join()方法

    from multiprocessing import Process
    import os
    def worker(n):
        print 'hello world', n
    if __name__ == '__main__':
        print 'parent process id:', os.getppid()
        for n in range(5):
            p = Process(target=worker, args=(n,))
            p.start()
            p.join()
            print 'child process id:', p.pid
            print 'child process name:', p.name
            
    # python test.py
    parent process id: 9041
    hello world 0
    child process id: 9132
    child process name: Process-1
    hello world 1
    child process id: 9133
    child process name: Process-2
    hello world 2
    child process id: 9134
    child process name: Process-3
    hello world 3
    child process id: 9135
    child process name: Process-4
    hello world 4
    child process id: 9136
    child process name: Process-5
    
    # 把p.join()注释掉再执行
    # python test.py
    parent process id: 9041
    child process id: 9125
    child process name: Process-1
    child process id: 9126
    child process name: Process-2
    child process id: 9127
    child process name: Process-3
    child process id: 9128
    child process name: Process-4
    hello world 0
    hello world 1
    hello world 3
    hello world 2
    child process id: 9129
    child process name: Process-5
    hello world 4

    可以看出,在使用join()方法时,输出的结果都是顺序排列的。相反是乱序的。因此join()方法是堵塞父进程,要等待当前子进程执行完后才会继续执行下一个子进程。否则会一直生成子进程去执行任务。

    在要求输出的情况下使用join()可保证每个结果是完整的。

    3)给子进程命名,方便管理

    from multiprocessing import Process
    import os, time
    def worker1(n):
        print 'hello world', n
    def worker2():
        print 'worker2...'
    if __name__ == '__main__':
        print 'parent process id:', os.getppid()
        for n in range(3):
            p1 = Process(name='worker1', target=worker1, args=(n,))
            p1.start()
            p1.join()
            print 'child process id:', p1.pid
            print 'child process name:', p1.name
        p2 = Process(name='worker2', target=worker2)
        p2.start()
        p2.join()
        print 'child process id:', p2.pid
        print 'child process name:', p2.name
        
    # python test.py
    parent process id: 9041
    hello world 0
    child process id: 9248
    child process name: worker1
    hello world 1
    child process id: 9249
    child process name: worker1
    hello world 2
    child process id: 9250
    child process name: worker1
    worker2...
    child process id: 9251
    child process name: worker2

    4)设置守护进程,父进程退出也不影响子进程运行

    from multiprocessing import Process
    def worker1(n):
        print 'hello world', n
    def worker2():
        print 'worker2...'
    if __name__ == '__main__':
        for n in range(3):
            p1 = Process(name='worker1', target=worker1, args=(n,))
            p1.daemon = True
            p1.start()
            p1.join()
        p2 = Process(target=worker2)
        p2.daemon = False
        p2.start()
        p2.join()

    5)使用进程池

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    from multiprocessing import Pool, current_process
    import os, time, sys
    def worker(n):
        print 'hello world', n
        print 'process name:', current_process().name  # 获取当前进程名字
        time.sleep(1)    # 休眠用于执行时有时间查看当前执行的进程
    if __name__ == '__main__':
        p = Pool(processes=3)
        for i in range(8):
            r = p.apply_async(worker, args=(i,))
            r.get(timeout=5)  # 获取结果中的数据
        p.close()
        
    # python test.py
    hello world 0
    process name: PoolWorker-1
    hello world 1
    process name: PoolWorker-2
    hello world 2
    process name: PoolWorker-3
    hello world 3
    process name: PoolWorker-1
    hello world 4
    process name: PoolWorker-2
    hello world 5
    process name: PoolWorker-3
    hello world 6
    process name: PoolWorker-1
    hello world 7
    process name: PoolWorker-2

    进程池生成了3个子进程,通过循环执行8次worker函数,进程池会从子进程1开始去处理任务,当到达最大进程时,会继续从子进程1开始。

    在运行此程序同时,再打开一个终端窗口会看到生成的子进程:

    # ps -ef |grep python
    root      40244   9041  4 16:43 pts/3   00:00:00 python test.py
    root      40245  40244  0 16:43 pts/3    00:00:00 python test.py
    root      40246  40244  0 16:43 pts/3    00:00:00 python test.py
    root      40247  40244  0 16:43 pts/3    00:00:00 python test.py

    6)进程池map()方法

    map()方法是将序列中的元素通过函数处理返回新列表。

    from multiprocessing import Pool
    def worker(url):
        return 'http://%s' % url
    urls = ['www.baidu.com', 'www.jd.com']
    p = Pool(processes=2)
    r = p.map(worker, urls)
    p.close()
    print r
    
    # python test.py
    ['http://www.baidu.com', 'http://www.jd.com']

    7)Queue进程间通信

    multiprocessing支持两种类型进程间通信:Queue和Pipe。

    Queue库已经封装到multiprocessing库中,在第十章 Python常用标准库已经讲解到Queue库使用,有需要请查看以前博文。

    例如:一个子进程向队列写数据,一个子进程读取队列数据

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    from multiprocessing import Process, Queue
    # 写数据到队列
    def write(q):
        for n in range(5):
            q.put(n)
            print 'Put %s to queue.' % n
    # 从队列读数据
    def read(q):
        while True:
            if not q.empty():
                value = q.get()
                print 'Get %s from queue.' % value
            else:
                break
    if __name__ == '__main__':
        q = Queue()
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))   
        pw.start()
        pw.join()
        pr.start()
        pr.join()
        
    # python test.py
    Put 0 to queue.
    Put 1 to queue.
    Put 2 to queue.
    Put 3 to queue.
    Put 4 to queue.
    Get 0 from queue.
    Get 1 from queue.
    Get 2 from queue.
    Get 3 from queue.
    Get 4 from queue.

    8)Pipe进程间通信

    from multiprocessing import Process, Pipe
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print parent_conn.recv() 
        p.join()
        
    # python test.py
    [42, None, 'hello']

    Pipe()创建两个连接对象,每个链接对象都有send()和recv()方法,

    9)进程间对象共享

    Manager类返回一个管理对象,它控制服务端进程。提供一些共享方式:Value()、Array()、list()、dict()、Event()等

    创建Manger对象存放资源,其他进程通过访问Manager获取。

    from multiprocessing import Process, Manager
    def f(v, a, l, d):
        v.value = 100
        a[0] = 123
        l.append('Hello')
        d['a'] = 1
    mgr = Manager()
    v = mgr.Value('v', 0)
    a = mgr.Array('d', range(5))
    l = mgr.list()
    d = mgr.dict()
    p = Process(target=f, args=(v, a, l, d))
    p.start()
    p.join()
    print(v)
    print(a)
    print(l)
    print(d)
    
    # python test.py
    Value('v', 100)
    array('d', [123.0, 1.0, 2.0, 3.0, 4.0])
    ['Hello']
    {'a': 1}

    10)写一个多进程的例子

    比如:多进程监控URL是否正常

    from multiprocessing import Pool, current_process
    import urllib2
    urls = [
        'http://www.baidu.com',
        'http://www.jd.com',
        'http://www.sina.com',
        'http://www.163.com',
    ]
    def status_code(url):
        print 'process name:', current_process().name
        try:
            req = urllib2.urlopen(url, timeout=5)
            return req.getcode()
        except urllib2.URLError:
            return
    p = Pool(processes=4)
    for url in urls:
        r = p.apply_async(status_code, args=(url,))
        if r.get(timeout=5) == 200:
            print "%s OK" %url
        else:
            print "%s NO" %url
            
    # python test.py
    process name: PoolWorker-1
    http://www.baidu.com OK
    process name: PoolWorker-2
    http://www.jd.com OK
    process name: PoolWorker-3
    http://www.sina.com OK
    process name: PoolWorker-4
    http://www.163.com OK


    博客地址:http://lizhenliang.blog.51cto.com

    QQ群:323779636(Shell/Python运维开发群


    15.2 threading

    threading模块类似于multiprocessing多进程模块,使用方法也基本一样。threading库是对thread库进行二次封装,我们主要用到Thread类,用Thread类派生线程对象。

    1)使用Thread类实现多线程

    from threading import Thread, current_thread
    def worker(n):
        print 'thread name:', current_thread().name
        print 'hello world', n
        
    for n in range(5):
        t = Thread(target=worker, args=(n, ))
        t.start()
        t.join()  # 等待主进程结束
        
    # python test.py
    thread name: Thread-1
    hello world 0
    thread name: Thread-2
    hello world 1
    thread name: Thread-3
    hello world 2
    thread name: Thread-4
    hello world 3
    thread name: Thread-5
    hello world 4

    2)还有一种方式继承Thread类实现多线程,子类可以重写__init__和run()方法实现功能逻辑。

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    from threading import Thread, current_thread
    class Test(Thread):
        # 重写父类构造函数,那么父类构造函数将不会执行
        def __init__(self, n):
            Thread.__init__(self)
            self.n = n
        def run(self):
            print 'thread name:', current_thread().name
            print 'hello world', self.n
    if __name__ == '__main__':
        for n in range(5):
            t = Test(n)
            t.start()
            t.join()
            
    # python test.py
    thread name: Thread-1
    hello world 0
    thread name: Thread-2
    hello world 1
    thread name: Thread-3
    hello world 2
    thread name: Thread-4
    hello world 3
    thread name: Thread-5
    hello world 4

    3)Lock

    from threading import Thread, Lock, current_thread
    lock = Lock()
    class Test(Thread):
        # 重写父类构造函数,那么父类构造函数将不会执行
        def __init__(self, n):
            Thread.__init__(self)
            self.n = n
        def run(self):
            lock.acquire()  # 获取锁
            print 'thread name:', current_thread().name
            print 'hello world', self.n
            lock.release()  # 释放锁
    if __name__ == '__main__':
        for n in range(5):
            t = Test(n)
            t.start()
            t.join()

    众所周知,Python多线程有GIL全局锁,意思是把每个线程执行代码时都上了锁,执行完成后会自动释放GIL锁,意味着同一时间只有一个线程在运行代码。由于所有线程共享父进程内存、变量、资源,很容易多个线程对其操作,导致内容混乱。

    当你在写多线程程序的时候如果输出结果是混乱的,这时你应该考虑到在不使用锁的情况下,多个线程运行时可能会修改原有的变量,导致输出不一样。

    由此看来Python多线程是不能利用多核CPU提高处理性能,但在IO密集情况下,还是能提高一定的并发性能。也不必担心,多核CPU情况可以使用多进程实现多核任务。Python多进程是复制父进程资源,互不影响,有各自独立的GIL锁,保证数据不会混乱。能用多进程就用吧!


关键字