Python学习—python中的线程

发布时间:2019-09-19 08:02:03编辑:auto阅读(1455)

    1.线程定义

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个进程至少有一个线程,一个进程必定有一个主线程。

    2.创建线程

    创建线程的两个模块:
    (1)thread(在python3中改名为_thread)
    (2)threding
    _thread提供了低级别的、原始的线程以及一个简单的锁。threading基于Java的线程模型设计。thread和threading模块都可以用来创建和管理线程,而thread模块提供了基本的线程和锁支持。threading提供的是更高级的完全的线程管理。低级别的thread模块是推荐给高手用,一般应用程序推荐使用更高级的threading模块:
    1.它更先进,有完善的线程管理支持,此外,在thread模块的一些属性会和threading模块的这些属性冲突。
    2.thread模块有很少的(实际上是一个)同步原语,而threading却有很多。
    3.thread模块没有很好的控制,特别当你的进程退出时,比如:当主线程执行完退出时,其他的线程都会无警告,无保存的死亡,而threading会允许默认,重要的子线程完成后再退出,它可以特别指定daemon类型的线程。

    1.通过模块直接创建线程

    _thread模块创建进程

    import _thread
    
    def job(name):
        print("%s正在做工作........" %name)
        print("%s工作完成..........." %name)
    
    if __name__ == "__main__":
        try:
            #_thread模块 创建2个线程,再加上主线程,这个程序运行就一共有三个线程
            _thread.start_new_thread(job,('ddd',))
            _thread.start_new_thread(job,('eee',))
        except Exception as e:
            print("创建线程失败。",e)
        else:
            print("创建线程成功。")
        print('主线程结束')

    每次运行程序可以看到不同的结果:

    (1)
    创建线程成功。eee正在做工作........
    eee工作完成...........
    ddd正在做工作........
    ddd工作完成...........
    
    (2)
    创建线程成功。ddd正在做工作........eee正在做工作........
    eee工作完成...........
    ddd工作完成...........
    
    (3)
    创建线程成功。
    ddd正在做工作........
    
    (4)
    创建线程成功。eee正在做工作........

    这些结果不同,是因为线程并发执行,三个线程来回切换在cpu工作,且当主线程结束后,不管其它线程是否完成工作都被迫结束。
    通过threading模块创建线程

    def job(name):
        print("%s正在做第一部分工作........" %name)
        print("%s正在做第二部分工作........" %name)
        print("%s正在做第三部分工作........" %name)
        print("%s工作完成..........." %name)
    
    if __name__ == "__main__":
        try:
            #threading模块 创建新的线程 返回一个线程对象
            #target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
            t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
            # start方法使线程开始执行
            t1.start()
            t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
            t2.start()
        except Exception as e:
            print("创建线程失败\n",e)
        print('主线程结束.....')

    每次运行程序的结果:

    (1)
    aaa正在做第一部分工作........
    aaa正在做第二部分工作........
    bbb正在做第一部分工作........
    主线程结束.....
    aaa正在做第三部分工作........
    aaa工作完成...........
    bbb正在做第二部分工作........
    bbb正在做第三部分工作........
    bbb工作完成...........
    
    (2)
    aaa正在做第一部分工作........
    bbb正在做第一部分工作........
    bbb正在做第二部分工作........
    bbb正在做第三部分工作........主线程结束.....
    bbb工作完成...........
    aaa正在做第二部分工作........
    aaa正在做第三部分工作........
    aaa工作完成...........
    
    (3)
    aaa正在做第一部分工作........
    aaa正在做第二部分工作........
    aaa正在做第三部分工作........bbb正在做第一部分工作........
    
    aaa工作完成...........bbb正在做第二部分工作........
    主线程结束.....
    bbb正在做第三部分工作........
    bbb工作完成...........

    可以看到,不同的多个线程是相互交叉着在cpu执行的,和_thread不同的是它创建了一个线程类对象,也不会因为主线程的结束而结束所有的线程。

    使用join方法
    在A线程中调用了B线程的join法时,表示只有当B线程执行完毕时,A线程才能继续执行。多个线程使用了join方法,剩下的其它线程只有在这些线程执行完后才能继续执行。
    这里调用的join方法是没有传参的,join方法其实也可以传递一个参数给它的。
    join方法中如果传入参数,则表示这样的意思:如果A线程中掉用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒过后,A、B线程并行执行。
    需要注意的是,jdk规定,join(0)的意思不是A线程等待B线程0秒,而是A线程等待B线程无限时间,直到B线程执行完毕,即join(0)等价于join()。

    def job(name):
        print("%s正在做第一部分工作........" %name)
        print("%s正在做第二部分工作........" %name)
        print("%s正在做第三部分工作........" %name)
        print("%s工作完成..........." %name)
    
    if __name__ == "__main__":
        try:
            #threading模块 创建新的线程 返回一个线程对象
            #target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
            t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
            # start方法使线程开始执行
            t1.start()
            t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
            t2.start()
            t1.join()
            t2.join()
        except Exception as e:
            print("创建线程失败\n",e)
        print('主线程结束.....')

    每次运行程序的结果:

    (1)
    aaa正在做第一部分工作........
    aaa正在做第二部分工作........bbb正在做第一部分工作........
    aaa正在做第三部分工作........
    aaa工作完成...........
    
    bbb正在做第二部分工作........
    bbb正在做第三部分工作........
    bbb工作完成...........
    主线程结束.....
    
    (2)
    aaa正在做第一部分工作........
    bbb正在做第一部分工作........
    bbb正在做第二部分工作........
    bbb正在做第三部分工作........
    bbb工作完成...........
    aaa正在做第二部分工作........
    aaa正在做第三部分工作........
    aaa工作完成...........
    主线程结束.....
    
    (3)
    aaa正在做第一部分工作........bbb正在做第一部分工作........
    
    bbb正在做第二部分工作........
    bbb正在做第三部分工作........aaa正在做第二部分工作........
    aaa正在做第三部分工作........
    
    bbb工作完成...........
    aaa工作完成...........
    主线程结束.....

    2.通过继承Thread类创建线程

    当通过继承Thread类来创建线程时,需要传入参数,可以在构造方法增加相应的属性,以此来传入所需要的参数。
    Thread类有一个run方法,当创建一个线程后,使用start方法时,实际上就是在调用类里面的run方法,因此可以在继承Thread类的时候,重写run方法来完成自己的任务。

    import threading
    
    class Jobthread(threading.Thread):
        def __init__(self,name):
            super(Jobthread, self).__init__()
            self.name = name
        #重写Thread类的run方法
        def run(self):
            print('%s线程待完成第一部分工作' %self.name)
            print("%s正在做第二部分工作........" %self.name)
            print("%s正在做第二部分工作........" %self.name)
    
    if __name__ == "__main__":
        #实例化类创建第一个线程对象
        t1 = Jobthread('aaa')
        t1.start()
        #实例化类创建第二个线程对象
        t2 = Jobthread('bbb')
        t2.start()
        t1.join()
        t2.join()
        print('主线程结束.....')

    每次运行程序的结果:

    (1)
    wwww正在做第一部分工作
    wwww正在做第二部分工作........
    eeee正在做第一部分工作
    eeee正在做第二部分工作........
    eeee正在做第二部分工作........
    wwww正在做第二部分工作........
    主线程结束.....
    
    (2)
    wwww正在做第一部分工作
    eeee正在做第一部分工作
    wwww正在做第二部分工作........
    eeee正在做第二部分工作........
    wwww正在做第二部分工作........
    eeee正在做第二部分工作........
    主线程结束.....
    
    (3)
    wwww正在做第一部分工作eeee正在做第一部分工作
    eeee正在做第二部分工作........
    
    wwww正在做第二部分工作........
    wwww正在做第二部分工作........
    eeee正在做第二部分工作........
    主线程结束.....

    可以看到,通过继承线程类,然后重写run方法,实例化这个类,这样也可以新创建线程,在某些情况下,这样还更加方便。

    3.守护线程-daemon

    线程的Daemon属性:当主线程执行结束, 让没有执行完成的线程强制结束的一个属性:daemon
    setDaemon方法是改变线程类的一个属性:daemon,也可以在创建线程的时候指定这个属性的值,他的值默认为None

    import threading
    import time
    
    # 任务1:
    def music(name):
        for i in range(2):
            print("正在听音乐%s" %(name))
            time.sleep(2)
            print('听音乐结束')
    # 任务2:
    def code(name):
        for i in range(2):
            print("正在编写代码%s" %(name))
            time.sleep(2)
            print('写代码结束')
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=music, args=("中国梦",))
        t2 = threading.Thread(target=code, args=("爬虫", ))
    
        # 将t1线程声明为守护线程, 如果设置为True, 子线程启动, 当主线程执行结束, 子线程也结束
        # 设置setDaemon必须在启动线程之前进行设置;
        t1.setDaemon(True)
        t2.setDaemon(True)
        t1.start()
        t2.start()
        print('完成任务......')

    运行结果:

    (1)
    正在听音乐中国梦正在编写代码爬虫
    完成任务......
    
    (2)
    正在听音乐中国梦
    正在编写代码爬虫完成任务......

    当设置daemon属性为True,就和_thread模块的线程一样主线程结束,其它线程也被迫结束

    4.线程中的锁

    1.全局解释锁

    什么是全局解释器锁(GIL)
    Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
    即全局解释器锁,使得在同一时间内,python解释器只能运行一个线程的代码,这大大影响了python多线程的性能。
    需要明确的一点是GIL并不是Python的特性
    GIL是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
    python GIL 会影响多线程等性能的原因:
    因为在多线程的情况下,只有当线程获得了一个全局锁的时候,那么该线程的代码才能运行,而全局锁只有一个,所以使用python多线程,在同一时刻也只有一个线程在运行,因此在即使在多核的情况下也只能发挥出单核的性能。
    经过GIL这一道关卡处理,会增加执行的开销。这意味着,如果你想提高代码的运行速度,使用threading包并不是一个很好的方法。
    在多线程环境中,Python 虚拟机按以下方式执行:

    1. 设置GIL
    2. 切换到一个线程去运行
    3. 运行:
      a. 指定数量的字节码指令,或者
      b. 线程主动让出控制(可以调用time.sleep(0))
    4. 把线程设置为睡眠状态
    5. 解锁GIL
    6. 再次重复以上所有步骤
      既然python在同一时刻下只能运行一个线程的代码,那线程之间是如何调度的呢?
      对于有io操作的线程,当一个线程在做io操作的时候,因为io操作不需要cpu,所以,这个时候,python会释放python全局锁,这样其他需要运行的线程就会使用该锁。
      对于cpu密集型的线程,比如一个线程可能一直需要使用cpu做计算,那么python中会有一个执行指令的计数器,当一个线程执行了一定数量的指令时,该线程就会停止执行并让出当前的锁,这样其他的线程就可以执行代码了。
      由上面可知,至少有两种情况python会做线程切换,一是一但有IO操作时,会有线程切换,二是当一个线程连续执行了一定数量的指令时,会出现线程切换。当然此处的线程切换不一定就一定会切换到其他线程执行,因为如果当前线程优先级比较高的话,可能在让出锁以后,又继续获得锁,并优先执行。

    这里就可以将操作分两种:
    i/o密集型
    cpu密集型(计算密集型)
    对于前者我们尽可能的采用多线程方式,后者尽可能采用多进程方式

    2.线程锁

    为什么会需要线程锁?
    多个线程对同一个数据进行修改时, 会出现不可预料的情况。
    例如:

    def add():
        global money
        for i in range(1000000):
            money += 1
    
    def reduce():
        global money
        for  i in range(1000000):
            money -= 1
    
    if __name__ =="__main__":
        money = 0
        t1 = threading.Thread(target=add)
        t2 = threading.Thread(target=reduce)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(money)

    因为没有对变量money做访问限制,在某一个线程对其进行操作时,另一个线程仍可以对它进行访问、操作,致使最终结果出错,且不可预料,不是期待值。

    (1)
    55651
    (2)
    -133447
    (3)
    -236364

    当我们使用线程锁的时候:

    import threading
    
    def add(lock):
        global money
        lock.acquire()
        for i in range(100000):
            money += 1
        lock.release()
    
    def reduce(lock):
        global money
        lock.acquire()
        for  i in range(1000000):
            money -= 1
        lock.release()
    
    if __name__ =="__main__":
        money = 0
        lock = threading.Lock()
        t1 = threading.Thread(target=add,args=(lock,))
        t2 = threading.Thread(target=reduce,args=(lock,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(money)

    运行结果正确,始终为0

    5.多线程

    使用多线程来查ip的地理位置

    import json
    from urllib.request import urlopen
    
    class Job(threading.Thread):
        def __init__(self,ip):
            super(Job,self).__init__()
            self.ip = ip
        def check_ip(self):
            url = 'http://ip.taobao.com/service/getIpInfo.php?ip=%s' % self.ip
            text = urlopen(url).read().decode('utf-8')
            d = json.loads(text)['data']
            country = d['country']
            city = d['city']
            print(self.ip+':\t'+country+'\t'+city)
        def run(self):
            self.check_ip()
    
    if __name__ == "__main__":
        tt = []
        ips = ['172.25.254.23', '111.213.215.66', '152.158.32.54', '164.52.196.89','214.63.145.189']
        for ip in ips:
            t = Job(ip)
            t.start()
            tt.append(t)
        [i.join() for i in tt]

    结果:

    172.25.254.23:  XX  内网IP
    214.63.145.189: 美国  XX
    164.52.196.89:  印度  XX
    111.213.215.66: 中国  上海
    152.158.32.54:  欧洲  XX

    6.生产者消费者模型

    1.模型引入

    1). 理论上多线程执行任务, 会产生一些数据, 为其他程序执行作铺垫;
    2). 多线程是不能返回任务执行结果的, 因此需要一个容器来存储多线程产生的数据
    3). 这个容器如何选择? list(栈, 队列), tuple(x), set(x), dict(x), 此处选择队列来实现
    队列与多线程

    import threading
    from queue import Queue
    
    def job(l,queue):
        # 将任务的结果存储到队列中
        queue.put(sum(l))
    
    def use_thread():
        # 实例化一个队列, 用来存储每个线程执行的结果
        q = Queue()
        li = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]
        threads = []
        for i in li:
            t = threading.Thread(target=job,args=(i,q))
            threads.append(t)
            t.start()
        # join方法等待所有子线程执行结束
        [i.join() for i in threads]
        # 从队列里面拿出所有的运行结果
        result = [q.get() for i in li]
        print(result)
    
    if __name__ == "__main__":
        use_thread()

    运行结果:

    [21, 27, 33, 39]

    2.生产者消费者模型

    在软件开发的过程中,经常碰到这样的场景:
    某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。
    为了容易理解,我们举一个寄信的例子。假设你要寄一封信,大致过程如下:
    1、你把信写好——相当于生产者生产数据
    2、你把信放入邮箱——相当于生产者把数据放入缓冲区
    3、邮递员把信从邮箱取出,做相应处理——相当于消费者把数据取出缓冲区,处理数据
    生产者消费者模式的优点
    1.解耦
    假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
    举个例子:我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
    2.并发
    由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
    继续上面的例子:如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
    3.支持忙闲不均
    当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。
    我们再拿寄信的例子:假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。

    实例:
    1.文件ipfile.txt中有大量的ip地址,要求将ip地址取出来再与端口号组合,放入队列中
    2.从队列中取出地址,依次访问并返回访问结果

    import  threading
    from queue import Queue
    from urllib.request import urlopen
    import time
    
    class Procuder(threading.Thread):
        def __init__(self, q):
            super(Procuder, self).__init__()
            self.q = q
        def run(self):
            portlist = [80, 443, 7001, 8000, 8080]
            with open('ipfile.txt') as f:
                for ip in f:
                    for port in portlist:
                        url = 'http://%s:%s' % (ip.strip(), port)
                        self.q.put(url)
    
    class Consumer(threading.Thread):
        def __init__(self, q):
            super(Consumer, self).__init__()
            self.q = q
        def run(self):
            # 阻塞0.001妙使生产者先运行再队列中放入数据
            time.sleep(0.001)
            #只要队列不为空就一直“消费”数据
            while not self.q.empty():
                try:
                    url = self.q.get()
                    urlObj = urlopen(url)
                except Exception as e:
                    print("%s unknown url" %(url))
                else:
                    print("%s is ok" %(url))
    
    if __name__ == '__main__':
        q = Queue(20)
        p1 = Procuder(q)
        p1.start()
        c = Consumer(q)
        c.start()
        # 阻塞调用线程,直到队列中的所有任务被处理掉,再继续向下执行。
        q.join()

    运行结果就不截图了。

    7.线程池

    传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。

    一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。

    使用线程池:
    由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。

    #导入模块 注意: python3.2版本以后才可以使用;
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    #需要完成的任务
    def job(n):
        sum = 0
        for i in range(1,n+1):
            sum += i
        return sum
    
    if __name__ =="__main__":
        #实例化线程池对象,设置线程池有10个线程
        pool = ThreadPoolExecutor(max_workers=10)
    
        #向线程池提交任务submit方法返回一个_base.Future对象,这个对象含有许多方法。便于我们对线程操作
        f1 = pool.submit(job,20)
        f2 = pool.submit(job,16)
    
        #查看线程是否完成任务(线程是否被销毁,完成任务的线程会被释放)
        #这里休眠1妙是因为线程在完成工作后会被释放,如果立即查看线程状态,可能线程正在释放中,会返False,这里等待1妙让线程完成释放之后在查看线程状态。
        time.sleep(1)
        print(f1.done())
        print(f2.done())
    
        #直接获取任务执行结果
        print(f1.result())
        print(f2.result())

    运行结果:

    True
    True
    210
    136

    实现线程池的三种方法(实际可以看成2种)

    concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:
    (1)map可以保证输出的顺序, submit输出的顺序是乱的
    (2)如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
    (3)submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。

    from concurrent.futures import as_completed
    from concurrent.futures.thread import ThreadPoolExecutor
    from urllib.request import urlopen
    
    urls = ['http://a.cn','http://1688.cn','http://jd.cn','http://qq.cn','http://qq.com','http://111.231.215.66']*10
    
    def get_page(url):
        try:
            content = urlopen(url).read()
        except:
            return {'url:'+url+'   page_len:'+str(0)}
        else:
            return {'url:'+url+'   page_len:'+str(len(content))}
    
    # 1.通过for循环打印结果
    print('第一种方法:')
    pool = ThreadPoolExecutor(max_workers=10)
    resultlist = [pool.submit(get_page,url) for url in urls]
    for i in resultlist:
        print(i.result())
    
    # 2.通过方法as_completed
    print('第二种方法:')
    pool = ThreadPoolExecutor(max_workers=10)
    resultlist = [pool.submit(get_page,url) for url in urls]
    for i in as_completed(resultlist):
        print(i.result())
    
    # 3.通过map方法
    print('第三种方法:')
    pool = ThreadPoolExecutor(max_workers=10)
    for res in pool.map(get_page,urls):
        print(res)

关键字

上一篇: 解读HTTP/3

下一篇: 利用python来解析html