python 并发执行之多线程

发布时间:2019-09-19 08:04:45编辑:auto阅读(1423)

        正常情况下,我们在启动一个程序的时候。这个程序会先启动一个进程,启动之后这个进程会拉起来一个线程。这个线程再去处理事务。也就是说真正干活的是线程,进程这玩意只负责向系统要内存,要资源但是进程自己是不干活的。默认情况下只有一个进程只会拉起来一个线程。

        多线程顾名思义,就是同样在一个进程的情况同时拉起来多个线程。上面说了,真正干活的是线程。进程与线程的关系就像是工厂和工人的关系。那么现在工厂还是一个,但是干活的工人多了。那么效率自然就提高了。因为只有一个进程,所以多线程在提高效率的同时,并没有向系统伸手要更多的内存资源。因此使用起来性价比还是很高的。但是多线程虽然不更多的消耗内存,但是每个线程却需要CPU的的参与。

    相当于工厂虽然厂房就一间,可以有很多的工人干活。但是这些工人怎么干活还得靠厂长来指挥。工人太多了,厂长忙不过来安排一样效率不高。所以工人(线程)的数量最好还是在厂长(cpu)的能力(内核数)范围之内比较好。

        在python中多线程的实现方式有两种,我的总结就是一种是函数形式的。一种是通过自己创建一个类并继承threading.Thread类来实现的。其实关于多线程用到模块,也是有两种。一种是thread。这个模块是最原始的多线程模块,但是这个模块据说是比较low的。threading模块封装了thread模块,反正就是比较高级,反正就是没人用thread写程序,都用threading!!记住就好~

        下面先来介绍第一种,也是我认为比较简单的一种函数形式的。

        先举个例子看下面的代码

    import time
    def haha(max_num):
        """
        随便定义一个函数,要求用户输入一个要打印数字的最大范围
        输入之后就会从0开始打印,直到用户输入的范围值
        """
        for i in range(max_num):
            """
            每次打印一个数字前要间隔1秒,那么打印10个数就要耗时10秒
            """
            time.sleep(1)
            print i
    for x in range(3):
        haha(10)

    上面的代码没什么难度,只是展现一下如果顺序执行函数haha()。执行三遍需要耗时30秒。因为程序要执行完第一个循环之后才会执行第二个循环。时间是累加的。

        现在我们引入多线程的方式执行。看看会不会有什么变化。

    import threading
    import time
    def haha(max_num):
        """
        随便定义一个函数,要求用户输入一个要打印数字的最大范围
        输入之后就会从0开始打印,直到用户输入的最大范围
        """
        for i in range(max_num):
            """
            每次打印一个数字要间隔1秒,那么打印10个数就要耗时10秒
            """
            time.sleep(1)
            print i
    for x in range(3):
        """
        这里的rang(3)是要依次启动三个线程,每个线程都调用函数haha()
        第一个线程启动执行之后,马上启动第二个线程再次执行。最后也相当
        函数执行了3次
        """
        #通过threading.Thread方法实例化多线程类
        #target后面跟的是函数的名称但是不要带括号也不填写参数
        #args后面的内容才是要传递给函数haha()的参数。切记参数一定要以数组的形式填写不然会报错。
        t=threading.Thread(target=haha,args=(10,))
        #将线程设置为守护线程
        t.setDaemon(True)
        #线程准备就绪,随时等候cpu调度
        t.start()

    执行的结果是。。。。。。。。。。。。。。什么都没有发生!!!!没有任何输出。什么情况??!!!是不是代码有错误??!

    其实问题就出在t.setDaemon(True)  这一句上。默认不写这句或者说默认设置的情况这一句应该是

    t.setDaemon(False)这样子的。那这一句是什么意思呢?


    setDaemon   设置为后台线程或前台线程(默认)

                如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不             论成功与否,均停止

                如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线             程也执行完成后,程序停止

    这些什么前台、后台、主线程都是什么玩意?听着是不是特别晕?其实没有这么复杂。简单理解就是如果这个参数是True,就表示程序流程跑完之后直接就关闭线程然后退出了,根本不管线程是否执行完。从上面的例子可以看出来,我们每执行一遍函数haha()最少也得耗时10秒,哪怕是打印第一个数字出来也得停顿1秒之后才会输出。但是程序流程就是拉起来三个线程就结束了。执行启动线程的3次for循环可用不了10秒,1秒都用不到就结束了。所以就出现了我们看到的结果,程序拉起来3个线程,就结束了主线程但是此时线程调用的函数haha()还没来得及输出呢,就被迫跟着程序一起结束了。

        既然找到了原因,我们就来修改一下代码。把碍事的那部分置为默认值或者干脆不写这一行

    import threading
    import time
    def haha(max_num):
        for i in range(max_num):
            time.sleep(1)
            print i
    for x in range(3):
        t=threading.Thread(target=haha,args=(5,))
        #也可以干脆不写这一行
        t.setDaemon(False)
        t.start()

    现在运行,就可以看到看起来很乱的执行结果

    0
    00
    1
     11
    2
    2
    2
    3
    3
    3
    4
    4
    4

    其实这就是三个线程并行运行同时输出,所以把结果都输出到一起引起。正是这种乱才整明白了确实三个函数haha()在同时运行。

        如果想让结果看起来规则一些可以考虑使用join()方法

    import threading
    import time
    def haha(max_num):
        for i in range(max_num):
            time.sleep(1)
            print i
    for x in range(3):
        t=threading.Thread(target=haha,args=(5,))
        t.start()
        #通过join方法让线程逐条执行
        t.join()

    这样执行的结果看起来就美观了

    0
    1
    2
    3
    4
    0
    1
    2
    3
    4
    0
    1
    2
    3
    4

    就像注释所说的那样,美观是没问题了。可是这样的话虽然创建了多个线程,每个线程却是依次执行的。没有了并行还要多线程干嘛。这样和最上面写的串行执行例子就一个效果了。因此join方法不能随便乱用的。

        可是既然有了join()方法它总得有用吧?设计出来肯定不是为了摆着看的。现在我们再修改一下代码,看看join()方法到底怎么正确使用。

    import threading
    import time
    def haha(max_num):
        for i in range(max_num):
            time.sleep(1)
            print i
    """
    创建一个列表,用于存储要启动多线程的实例
    """
    threads=[]
    for x in range(3):
        t=threading.Thread(target=haha,args=(5,))
        #把多线程的实例追加入列表,要启动几个线程就追加几个实例
        threads.append(t)
    for thr in threads:
        #把列表中的实例遍历出来后,调用start()方法以线程启动运行
        thr.start()
    for thr in threads:
        """
        isAlive()方法可以返回True或False,用来判断是否还有没有运行结束
        的线程。如果有的话就让主线程等待线程结束之后最后再结束。
        """
        if thr.isAlive():
            thr.join()

    上面学习setDaemon()方法的时候我们知道,主线程其实就相当于程序的主运行流程。那么程序运行的时候最先启动的一定就是主线程,主线程负责拉起子线程用于干活。我们的例子中运行函数haha()线程其实都是子线程。因此可以说多线程其实就是多个子线程。那么程序运行完最后一个退出的也肯定就是主线程。因此上例中最后再遍历一个遍threads列表的目的就是查看还是否有没有退出的子线程,只要还有子线程是活的,没有退出。就通过join()方法强制程序流程不可以走到主线程退出的那个步骤。只有等子线程都退出之后,才能根据join()方法的规则顺序执行到主线程退出的步骤。

        

        第二种创建多线程的方式就是通过自定义一个类来实现的。

    import threading
    import time
    class haha(threading.Thread):
        """
        自定义一个类haha,必须要继承threading.Thread,下面必须要重写一个run()方法。
        把要执行的函数写到run()方法里。如果没有run()方法就会报错。其实这个类的作用就是
        通过haha类里面的run()方法来定义每个启动的子线程要执行的函数内容。
        """
        def __init__(self,max_num):
            threading.Thread.__init__(self)
            self.max_num=max_num
        def run(self):
            for i in range(self.max_num):
                time.sleep(1)
                print i
    if __name__=='__main__':
        threads=[]
        for x in range(3):
            """
            只是这里和函数方式有点区别,因为haha类继承了threading.Thread,所以通过haha类的实例化
            就相当于调用了多线程的实例化。剩下的操作就和函数方式一个样子了。
            """
            t=haha(5)
            threads.append(t)
        for thr in threads:
            thr.start()
        for thr in threads:
            if thr.isAlive():
                thr.join()

    以上就是实现多线程的两种方式,根据个人喜好选择就好。没什么本质区别。


    下面介绍一下线程锁,先看下面一段代码

    import threading
    #定义一个变量
    gnum=0
    def work(max_number):
        for i in range(max_number):
            print i
            
    def mylock():
        global gnum
        """
        这个函数运行的时候需要先运行一下函数work()
        执行完之后将全局的gnum+1 
        """
        work(10)
        #将变量声明为全局变量
        gnum=gnum+1
        print 'gnum is ',gnum
    for x in range(5):
        """
        同时启动5个现成运行mylock()函数
        """
        t=threading.Thread(target=mylock)
        t.start()

        上面的例子看起来也不难,目的就是在执行gnum+1之前先运行另外一个耗时的函数而已。因为我们启动了5个线程同时运行,理论上运行流程应该是第一个线程运行完成之后gnum+1=1,此时第二个线程也运行完了在gnum=1的基础上再加1,使gnum=2。以此类推,最后当5个线程运行完了的时候gnum应该等于5。但是实际运行的时候并不是我们想象的那个样子!!!!!

        真实的情况是当我们第一个线程运行的时候gnum=0,运行一个耗时的work()函数。因为线程是并发执行的,那这时候在第一个work()还没运行完的情况下,第二个线程又启动开始运行了。第一个线程没有运行完的情况下,是不会执行gnum+1操作的。此时对第二个线程来说依旧是gnum=0。之后第一个线程结束的时候gnum经过自加1变成了gnum=1,可是第二个线程还是当初取值的时候还是按照gnum=0来进行的自加运算。所以第二次运算的结果很有可能还是gnum=1。没有达到我们理想的gnum=2的效果。

        从这里就可以看出来,如果多线程执行的任务互不相干那自然什么事情都没有。一旦要利用多线程多同一个变量进行操作的时候,因为线程是并发执行的。所以很有很可能同时修改变量,导致最终结果不符合我们的预期。

        遇到这种情况一个方案就是用我们上面跳到join方法,让线程依次运行。这样同时就只有一个线程在修改变量,不会出现混乱。但是问题还是一样多线程并发的效果就没有了。肯定不可取。第二个

    方案就是使用线程锁。什么是线程锁呢?就是在多个线程同时操作一个资源的时候,哪个线程先操作。哪个线程就先锁定这个资源。直到这个线程操作结束打开锁之后,其他的线程才能再操作。这就叫做线程安全,也就是线程锁。听起来好像和join()方法有点类似。其实还是有区别的,先来看看加了线程锁的代码。

    import threading
    gnum=0
    lock=threading.RLock()
    def work(max_number):
        for i in range(max_number):
            print i
    def mylock():
        work(10)
        #在操作gnum之前先上锁
        #acquire()的括号里可以定义锁定的timeout时间,超过这个时间就自动打开锁
        lock.acquire()
        global gnum
        gnum=gnum+1
        #操作结束之后再打开锁
        lock.release()
        print 'gnum is ',gnum
    for x in range(5):
        t=threading.Thread(target=mylock)
        t.start()

        上从面的代码可以看出区别,join()方法是对整个线程做限制。而线程锁lock.acquire是在线程执行过程中对某一部分进行锁限制。例子中被启动的各个线程还是可以并行运行work()这个比较耗时的函数,只是在gnum的处理上才会受到锁的限制而已。这样就解决了多线程同时操作一个资源引发错误数据的问题。另外一个要注意的就是threading.RLock()也是Lock()的高级用法,用这个高级的就可以了。

        

        多线程的event事件

    一般情况下,多线程在创建之后就开始立即投入工作。没有任何停顿。但是有时候我们也许并不希望如此。比如我们要写一个爬虫程序。在爬取网页之前,我希望先ping一下这个网页。看看这个网页网页是否可以ping通。如果通了就释放线程去爬取内容。如果不通就去测试下一个网页。所以python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。其中event.wait()相当于一个全局的标识,程序根据event.set()和event.clear()两个方法分别定制这个全局Flag的值为True或者Flase。当Flag=True的时候就相当于收到释放所有线程的信号。看下面一个列子

        

    import threading
    def do(event):
        print 'start'
        #函数执行到这里等待信号放行信号
        event.wait()
        #收到放行信号后执行下面的语句
        print 'execute'
    #实例化threading.Event()事件
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
    #先将Flag标识置为False
    event_obj.clear()
    inp = raw_input('input:')
    #如果用户输入'true'就像wait()发送放行信号
    if inp == 'true':
        event_obj.set()

    这样就完成通过set()和clear()方法控制线程运行的目的


    最后再简单介绍一下GIL

    GIL是python的全局解释器锁的简称。这个锁是干什么用的呢?说白了就是限制python解释调用cpu内核之用的。多线程理论上可以同时调用多个cpu内核同时工作,比如java语言就可以做到。但是python因为GIL的存在,同一时间只有一条进程在cpu内核中进行处理。虽然我们可以看到多线程并发运行,但是那只是因为cpu内核通过上下文的切换快速将多个线程来回执行造成的假象。python和java那种可以真正调用多核心多线程的语言,在效率上还是有差异的。这个就是python一直被人诟病的GIL锁。









     








        

关键字