Python学习—pyhton中的进程

发布时间:2019-09-21 11:10:47编辑:auto阅读(1499)

    1.进程定义

    进程: 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据、进程控制块(pcb)三部分组成。
    (1)我们编写的程序用来描述进程要完成哪些功能以及如何完成;
    (2)数据则是程序在执行过程中所需要使用的资源;
    (3)进程控制块用来记录进程的所有信息。系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

    2.创建进程

    新创建的进程在内存独立开辟一块空间,不与其他进程共享空间、数据。
    同一个进程中,新创建的线程与此进程里其他线程共享空间、数据。

    1.os.fork()函数

    os模块的三个方法:
    os.fork()创建一个当前进程的子进程
    os.getpid()获取当前进程pid
    os.getppid()获取当前进程的父进程的Pid
    关于fork():
    它用来创建一个进程,即为当前进程的子进程,复制父进程的所有代码并从fork语句处开始运行。运行父进程还是子进程的取决于当前os调度策略。
    在父进程中返回子进程的pid,在子进程中返回0。即返回0表示在子进程中运行,返回大与0的数表示在父进程中运行。

    例子:

    import os
    
    print('当前进程:',os.getpid())
    print('当前进程的父进程:',os.getppid())
    
    pid = os.fork()
    if pid == 0:
        print('此时为子进程:',os.getpid(),'\n其父进程:',os.getppid())
    else:
        print('父进程:',os.getpid(),'\nos.fork的返回值pid:',pid)

    运行结果:

    当前进程: 16839
    当前进程的父进程: 2912
    父进程: 16839 
    os.fork的返回值pid: 16842
    此时为子进程: 16842 
    其父进程: 16839

    从运行结果中看,在linux中fork产生子进程后是先运行父进程,当父进程结束后再进入子进程运行。

    2.实例化进程类

    直接通过实例化进程类multiprocessing.Process创建新进程。
    和线程类一样,进程类也有start()方法,join()方法。调用对象的start()方法实例上也是调用的类中的run()方法。

    # 导入进程模块
    import multiprocessing
    import os
    
    def job(ss):
        print(ss,'当前子进程:%s' %os.getpid())
    
    #实例化进程类,并提交任务,传入任务所需要的参数
    p1 = multiprocessing.Process(target=job,args=('abc',))
    p1.start()
    p2 = multiprocessing.Process(target=job,args=('123',))
    p2.start()
    
    # 和线程一样,进程也有join方法。
    p1.join()
    p2.join()
    
    print('完成......')

    运行结果:

    abc 当前子进程:17234
    123 当前子进程:17235
    完成......

    3.继承进程类来自定义进程类

    继承python提供的进程类,重写方法,创建自己所需要的进程类,再实例化自定义的进程类。

    import multiprocessing
    
    class Job(multiprocessing.Process):
        #重写构造方法
        def __init__(self,cc):
            super(Job, self).__init__()
            self.cc = cc
    
        #重写run方法,和线程一样
        def run(self):
            print(self.cc)
    
    #实例化对象
    if __name__ == "__main__":
        pp = []
        for i in range(10):
            p = Job(str(i)+':123456')
            pp.append(p)
            p.start()
    
        for p in pp:
            p.join()
        print('hahhahaha')

    运行结果:

    0:123456
    1:123456
    2:123456
    3:123456
    4:123456
    5:123456
    6:123456
    7:123456
    8:123456
    9:123456
    hahhahaha

    3.多进程与多线程的对比

    import threading
    import multiprocessing
    from timeit import timeit
    
    class Jobthread(threading.Thread):
        def __init__(self,li):
            super(Jobthread,self).__init__()
            self.li = li
        def run(self):
            sum(self.li)
    
    class Jobprocess(multiprocessing.Process):
        def __init__(self,li):
            super(Jobprocess, self).__init__()
            self.li = li
        def run(self):
            for i in self.li:
                sum(i)
    
    # 这个装饰器是自己写的,用来计算某个函数执行时间
    @timeit
    def use_Pro(list):
        for i in range(0,len(list), 1000):
            p = Jobprocess(list[i:i+1000])
            p.start()
    
    @timeit
    def use_Thr(list):
        for li in list:
            t = Jobthread(li)
            t.start
    
    if __name__ == "__main__":
        list = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]*1000
        use_Pro(list)
        use_Thr(list)

    运行结果:

    use_Pro运行时间0.0041866302490234375
    use_Thr运行时间0.02240157127380371

    正如看到的结果一样,多进程适合计算密集型任务,多线程适合i/o密集型任务。

    3.守护进程与终止进程

    1.守护进程-daemon属性

    和线程类似,进程类也有一个daemon属性,默认值为False。
    当改变他的值为True时,当主进程结束,就会强行终止其他的所以进程。
    实例:
    (1)第一个程序

    import multiprocessing
    import time
    
    def job():
        print('开始子进程')
        time.sleep(3)
        print('子进程结束')
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target=job)
        p.start()
        print("程序结束......")

    运行结果:

    程序结束......
    开始子进程
    子进程结束

    主进程结束,其他进程还在继续执行。
    (2)第二个程序

    import multiprocessing
    import time
    
    def job():
        print('开始子进程')
        time.sleep(3)
        print('子进程结束')
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target=job)
        p.daemon = True
        p.start()
        print("程序结束......")

    运行结果:

    程序结束......

    当主进程结束,其他进程将会被强制终止结束。

    2.终止进程

    import multiprocessing
    import time
    
    def job():
        print('开始子进程')
        time.sleep(3)
        print('子进程结束')
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target=job)
        p.daemon = True
        print(p.is_alive())     #启动进程之前查看进程状态
        p.start()
        print(p.is_alive())     #启动进程之后查看进程状态
        p.terminate()           #终止进程
        print(p.is_alive())     #终止进程命令一发出后,查看进程状态。此时进程在释放过程中,还没有被完全释放。
        p.join()                #先让进程完全释放
        print(p.is_alive())     #最后查看进程状态
    
        print("程序结束......")

    运行结果:

    False
    True
    True
    False
    程序结束......

    4.进程间通信

    """
    通过队列实现进程间通信,队列充当消息管道的作用(类似生产者消费者模型)
    这里通信一直存在,也就是这两个进程会一直存在,没有销毁释放。
    """
    import multiprocessing
    from multiprocessing import Queue
    import time
    
    class Put_news(multiprocessing.Process):
        def __init__(self,queue):
            super(Put_news, self).__init__()
            self.queue = queue
        def run(self):
            for i in range(100):
                self.queue.put(i)
                print("传递消息:%s" %i)
                time.sleep(0.1)
    
    class Get_news(multiprocessing.Process):
        def __init__(self,queue):
            super(Get_news, self).__init__()
            self.queue = queue
        def run(self):
            while True:
                time.sleep(0.11)
                print("接收消息++++++++++++:%s" %(self.queue.get()))
    
    if __name__ == "__main__":
        q = Queue()
        p = Put_news(q)
        g = Get_news(q)
        p.start()
        g.start()
    
        if not p.is_alive():
            g.terminate()

    运行结果:
    Python学习—pyhton中的进程

    Python学习—pyhton中的进程

    5.分布式进程

    任务需要处理的数据特别大, 希望多台主机共同处理任务。multiprocessing.managers子模块里面可以实现将进程分布到多台机器上
    (管理端主机要运算一些列任务,通过与其他主机建立“连接“,将任务分配给其他主机执行,并将执行结果返回给管理端主机。)
    管理端主机代码:

    import random
    from queue import Queue
    from multiprocessing.managers import BaseManager
    
    # 1.创建队列(发送任务的队列,收取结果的队列)
    task_queue = Queue()
    result_queue = Queue()
    
    # 第二三步骤可以互换顺序
    # 2.将队列注册到网络(这样其他主机可以通过网络接收任务,发送结果)
    # 注册的队列(任务队列,结果队列)的唯一标识码分别为'put_task_queue','get_result_queue'
    BaseManager.register('put_task_queue',callable=lambda :task_queue)
    BaseManager.register('get_result_queue',callable=lambda : result_queue)
    
    # 3.绑定端口(3333),设定密码(hahahaha)
    manager = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha')
    
    # 4.启动manager,开始共享队列
    manager.start()
    
    # 5.通过网络访问共享的队列
    task = manager.put_task_queue()
    result = manager.get_result_queue()
    
    # 6.向任务队列中放入执行任务的数据,这里放入100个任务
    for i in range(100):
        n = random.randint(10,500)
        task.put(n)
        print('任务列表加入数据:'+str(n))
    
    # 7.从结果队列中读取各个主机的任务执行结果
    for j in range(100):
        res = result.get()
        print('执行结果:'+str(res))
    
    # 8.任务执行结束,关闭共享队列
    manager.shutdown()

    运算主机代码:

    """
    在各个工作主机上执行的代码相同
    """
    
    from multiprocessing.managers import BaseManager
    
    # 1. 连接manager端,获取共享的队列
    import time
    
    worker = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha')
    
    # 2.注册队列,去获取网络上共享的队列中的内容
    BaseManager.register('put_task_queue')
    BaseManager.register('get_result_queue')
    
    # 3.连接网络
    worker.connect()
    
    # 4.通过网络访问共享的队列
    
    task = worker.put_task_queue()
    result = worker.get_result_queue()
    
    # 5.读取任务,处理任务,这里读取了50个任务进行处理
    # 每台运算主机上的处理任务数量可以不同,不过为了避免修改代码,一般都相同。
    for i in range(50):
        n = task.get()
        print('执行任务 %d**2 = '%(n))
        res = '%d**2=%d' %(n,n**2)  #这里设置执行的任务是求平方
        result.put(res)     #将结果放入结果队列
        time.sleep(1)       #休息1秒
    
    print('工作主机执行任务结束.....')

    6.进程池

    和线程一样,进程也有进程池。
    1.第一种方法

    import multiprocessing
    import time
    
    def job(id):
        print('start id ---> %d' %id)
        print('end id ----> %d' %id)
        time.sleep(3)
    # 创建含有8个进程的进程池
    pool = multiprocessing.Pool(8)
    # 给进城池的进程分配任务
    for i in range(12):
        pool.apply_async(job,args=(i,))
    
    # 关闭进程池,使进程池不再工作运行
    pool.close()
    # 等待所有子进程结束之后再开始主进程
    pool.join()
    
    print('all works completed!')
    

    运行结果:

    start id ---> 0
    end id ----> 0
    start id ---> 1
    end id ----> 1
    start id ---> 2
    end id ----> 2
    start id ---> 3
    end id ----> 3
    start id ---> 4
    end id ----> 4
    start id ---> 5
    end id ----> 5
    start id ---> 6
    end id ----> 6
    start id ---> 7
    end id ----> 7
    start id ---> 8
    end id ----> 8
    start id ---> 9
    end id ----> 9
    start id ---> 10
    end id ----> 10
    start id ---> 11
    end id ----> 11
    all works completed!

    2.第二种方法

    from concurrent.futures import ProcessPoolExecutor
    import time
    def job(id):
        print('start id ---> %d' %id)
        print('end id ----> %d' %id)
        time.sleep(3)
    
    # 创建含有2个进程的进程池
    pool = ProcessPoolExecutor(max_workers=2)
    # 给进程池的进程分配任务,submit方法返回一个_base.Future对象
    f1 = pool.submit(job,1)
    f2 = pool.submit(job,2)
    f3 = pool.submit(job,3)
    f4 = pool.submit(job,4)
    # 执行f1对象的各种方法
    f1.done()
    f1.result()

    运行结果:

    start id ---> 1
    end id ----> 1
    start id ---> 2
    end id ----> 2
    start id ---> 3
    end id ----> 3
    start id ---> 4
    end id ----> 4

    3.第三种方法

    from concurrent.futures import ProcessPoolExecutor
    import time
    def job(id):
        print('start id ---> %d' %id)
        print('end id ----> %d' %id)
        time.sleep(1)
    pool = ProcessPoolExecutor(max_workers=3)
    pool.map(job,range(1,10))

    运行结果:

    start id ---> 1
    end id ----> 1
    start id ---> 2
    end id ----> 2
    start id ---> 3
    end id ----> 3
    start id ---> 4
    end id ----> 4
    start id ---> 5
    end id ----> 5
    start id ---> 6
    end id ----> 6
    start id ---> 7
    end id ----> 7
    start id ---> 8
    end id ----> 8
    start id ---> 9
    end id ----> 9

关键字