Python线程之threading

发布时间:2019-07-23 09:46:11编辑:auto阅读(1468)

    线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程,线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。

    Threading模块提供线程相关的操作,Threading模块包含Thread,Lock,RLock,Event,Queue等组件;multiprocess模块完全模仿了threading模块的接口,二者在使用时,时极其相似的。

    1、Thread

    创建线程的两种方式:

    示例1:

    import time
    from threading import Thread
    
    def func(i):
        time.sleep(1)
        print("hello : %s"%i)
    
    thread_l = []
    
    # 开启多线程
    for i in range(10):
        t = Thread(target=func,args=(i,)) #实例化线程对象
        t.start()  # 激活线程
        thread_l.append(t)
    
    # 异步开启阻塞
    for j in thread_l:
        j.join()  # 阻塞主线程,子线程执行完毕之后向下执行主线程代码
    
    print("主线程")

    结果:

    hello : 2
    hello : 0
    hello : 1
    hello : 3
    hello : 5
    hello : 4
    hello : 7hello : 6hello : 9
    
    hello : 8
    主线程

    示例2:使用类继承的方式创建线程

    import time
    from threading import Thread
    
    class MyThread(Thread): # 继承Thread类
        count = 0   # 子线程间会共享此静态属性
        def __init__(self,arg1,arg2): # 通过init方法传递参数
            super().__init__()
            self.arg1 = arg1
            self.arg2 = arg2
    
        def run(self):  # 必须实现run方法
            MyThread.count += 1
            time.sleep(1)
            print("%s,hello!%s"%(self.arg1,self.arg2))
    
    thread_l = []
    for i in range(10):
        t = MyThread('eric','jonny')
        t.start()
        thread_l.append(t)
    for j in thread_l:
        j.join()
    print("conut: %s"%MyThread.count)
    
    结果:
    1,hello!jonny
    0,hello!jonny
    5,hello!jonny4,hello!jonny
    3,hello!jonny
    2,hello!jonny
    
    6,hello!jonny9,hello!jonny
    
    7,hello!jonny
    8,hello!jonny
    conut: 10

    Thread的主要方法:

    t.start() :激活线程
    t.join():阻塞(等待子线程执行完毕,在向下执行),在每次激活线程后阻塞会使线程变为同步,所以要在线程激活完毕之后阻塞。
    t.name :设置或获取线程名
    t.getName():获取线程名
    t.setName(NAME):设置线程名
    t.is_alive() :判断线程是否激活
    t.setDaemon() :设置守护线程,在激活线程之前设置,默认值为False
    t.isDaemon() : 判断是否为守护线程

    2、Lock与RLock

    同一个进程内的线程是数据共享的,线程的GIL(全局解释性)锁是锁的线程调用CPU的时间,在第一个线程调用CPU操作共享数据的时候,时间轮转至第二个线程,第二个线程也要操作共享数据,这样就导致了数据的不一致,这是因为GIL不锁数据,这种情况下,线程锁的出现就能解决这个这个问题。
    示例1:GIL锁发挥作用

    from threading import Thread
    
    def func():
        global n
        n -= 1
    
    n = 1000
    
    thread_l = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        thread_l.append(t)
    
    for j in thread_l:
        j.join()
    
    print(n)
    结果是:900

    示例2:时间片轮转,GIL锁失效

    import time
    from threading import Thread
    
    def func():
        global n
        # n -= 1
        temp = n  # 从进程中获取n
        time.sleep(0.01)  # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果
        n = temp -1  # 得到结果,再将修改过的数据返回给进程
    
    n = 1000
    
    thread_l = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        thread_l.append(t)
    
    for j in thread_l:
        j.join()
    
    print(n)
    结果是:998

    示例3:互斥锁Lock,对数据加锁

    import time
    from threading import Thread
    from threading import Lock
    
    def func():
        global n
        # n -= 1
        lock.acquire()   # 上锁
        temp = n  # 从进程中获取n
        time.sleep(0.01)  # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果
        n = temp -1  # 得到结果,再将修改过的数据返回给进程
        lock.release()   # 释放
    
    n = 1000
    lock = Lock()  # 实例化锁对象
    thread_l = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        thread_l.append(t)
    
    for j in thread_l:
        j.join()
    
    print(n)
    结果是:900

    互斥锁Lock后使数据一致,具有安全性,但是也带来了新的问题,因为锁的缘故,每一个线程都是串行的拿到锁,在释放;整个程序运行变成串行,效率降低。

    示例4:递归锁Rlock
    Lock在同一线程中只能被acquire一次,下一次的acquire必须等待release之后才可以;而RLock允许在同一线程中被多次acquire,但是acquire和release必须是成对存在。

    from threading import Lock
    from threading import RLock
    
    lock = Lock() # 实例化出互斥锁对象
    lock.acquire()
    lock.acquire() # 在第二次acquire时,程序阻塞等待release之后拿到锁
    print("死锁")
    lock.release()
    lock.release()
    
    lock1 = RLock() # 实例化出递归锁对象
    lock1.acquire()
    lock1.acquire() # 可被多次acquire
    print("running")
    lock1.release()
    lock1.release() # acquire与release成对出现

    在多线程并发的情况下,同一个线程中,如果出现多次acquire,就可能发生死锁现象,使用RLock就不会出现死锁问题

    3、Semaphore

    线程的信号量与进程的信号量使用基本一致;信号量可以允许指定数量的线程操作上锁的数据,即一把锁有多个钥匙。对与有信号量限制的程序来说,信号量有几个任务就开启几个线程,在加锁阶段会限制程序运行数量,并不影响其它代码的并发。
    示例

    import random
    import time
    from threading import Semaphore
    from threading import Thread
    
    def sing(i,sem):
        sem.acquire() # 加锁
        print('%s enter the ktv'%i)
        time.sleep(random.randint(1,10))
        print('%s leave the ktv'%i)
        sem.release() # 释放
    
    sem = Semaphore(4)
    for i in range(20):
        t = Thread(target=sing, args=(i, sem))
        t.start()
    

    4、Event

    事件:线程之间状态标记通信,使用方法与进程的基本一致
    主要方法:
    e = Event() # 实例化一个事件对象
    e.set() # 标记变为非阻塞
    e.wait() # 默认标记为阻塞,在等待的过程中,遇到非阻塞信号就继续执行
    e.clear() # 标记变为阻塞
    e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞

    示例:连接数据库

    '''
    连接数据库
    每0.5秒连一次,连接三次
    用事件来标志数据库的连接情况
    如果连接成功,显示成功
    否则报错
    '''
    
    import time
    import random
    from threading import Thread
    from threading import Event
    
    # 模拟检查连接,检查连接成功使事件标志位为非阻塞
    def check_conn():
        time.sleep(random.randint(1,3))
        e.set()
    
    # 在还没检查成功前等待,接到非阻塞信号则连接数据库
    def conn_mysql():
        count = 1
        while not e.is_set():
            if count > 3:
                raise TimeoutError
    
            print("尝试第 %s 次连接" % count)
            count += 1
            e.wait(0.5)
        print("连接成功")
    
    e = Event()  # 实例化事件对象
    
    Thread(target=check_conn).start()
    Thread(target=conn_mysql).start()

    5、Timer

    定时器:定时开启一个线程,执行一个任务
    示例:

    from threading import Timer
    
    def func():
        print("hello")
    
    '''
    必须有两个参数
    第一个是时间,单位为秒
    第二个是要执行的函数
    '''
    Timer(1,func).start()

    6、Condition

    条件变量:条件包含递归锁RLock和事件Event中的wait()方法的功能。
    五个方法:
    acquire(): 递归锁
    release(): 释放锁
    wait(timeout): 等待通知,或者等到设定的超时时间;才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError异常。
    notify(n=1): 通知其他线程,传入的参数必须时int类型的,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
    notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程

    示例:

    from threading import Condition
    from threading import Thread
    
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s"%n)
        con.release()
    
    if __name__ == '__main__':
        con = Condition()
        for i in range(10):
            t = Thread(target=run,args=(i,))
            t.start()
    
        while True:
            msg = input(">>> ")
            if msg == 'q':
                break
            con.acquire() # 递归锁
            if msg == 'all':
                con.notify_all()     # 放行所有线程
            else:
                con.notify(int(msg)) # 传递信号,放行线程,参数是int类型的
            con.release() # 释放锁

    7、Queue模块

    queue模块就是线程的队列,它是数据安全的。
    主要方法:
    q.put(1) # 将传入的数据放入队列
    q.get() # 根据对象所属类的不同,取出队列中的数据
    q.join() # 等队列为空时,在执行别的操作
    q.qsize() # 返回队列的大小,不一定准确
    q.empty() # 队列为空时,返回True,否则返回False,不一定准确
    q.full() # 队列满时,返回True,否则返回False,不一定准确

    Queue类的使用:先进先出

    import queue
    
    q = queue.Queue()  # 实例化一个队列对象,可给出队列长度,先进先出
    q.put(1)           # 将传入的数据放入队列
    q.put(2)
    print(q.get())     # 先进先出,取出队列的第一个值

    LifoQueue类的主要方法:后进先出

    import queue
    
    lfq = queue.LifoQueue() # 实例化一个对象,可给出长度,后进先出
    lfq.put(1)
    lfq.put(2)
    print(lfq.get()) #后进先出,取出2
    
    PriorityQueue类的主要方法:优先级
    import queue
    
    pq = queue.PriorityQueue()  # 实例化一个队列对象,优先级队列,优先级值越小越优先
    pq.put((10,'a'))
    pq.put((5,'b'))
    pq.put((1,'c'))
    pq.put((15,'d'))
    
    for i in range(4):
        print(pq.get())
    结果:
    (1, 'c')
    (5, 'b')
    (10, 'a')
    (15, 'd')

    8、concurrent模块之futures

    concurrent是用来操作池的模块,这个模块可创建进程池和线程池,其使用方法完全一致,统一了入口和方法,使用池更便捷,且python内置,导入便可使用。

    主要方法:
    submit(FUNC,ARGS):创建线程对象和激活线程,FUNC是要执行的任务,ARGS是参数
    shutdown():shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束
    result():result方法取线程执行的函数返回值
    map(FUNC,iter):map方法异步执行,需传入要执行的任务FUNC,以及一个可迭代对象iter,map方法无返回值
    add_done_callback(call):回调函数

    示例1:

    import time
    import random
    from concurrent import futures
    
    def func(n):
        time.sleep(random.randint(1,3))
        print(n)
        return n*"*"
    
    thread_pool = futures.ThreadPoolExecutor(20)  # 实例化一个线程池对象,一般开启CPU核数*5
    f_l = []
    for i in range(10):
        t = thread_pool.submit(func,i)    # submit方法合并了创建线程对象和激活线程的功能
        f_l.append(t)
    thread_pool.shutdown()    # shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束
    
    for j in f_l:
        print(j.result())   # result方法取线程执行的函数返回值

    示例2:回调

    from concurrent import futures
    
    def func(n):
        print(n)
        return n*"*"
    
    def call(args):
        print(args.result())
    
    thread_pool = futures.ThreadPoolExecutor(20)
    thread_pool.submit(func,1).add_done_callback(call)

关键字