python3异步编程-实例

发布时间:2019-09-28 08:38:01编辑:auto阅读(1894)

    Python3 异步编程实例篇

    本篇主要内容:

    • 启动一个线程
    • 启动多线程
    • 获取线程名字
    • 让线程按顺序执行
    • 给线程加上日志
    • 线程类的实现
    • 线程锁
    • 多线程使用全局变量下锁的重要性
    • 锁嵌套的问题
    • 使用队列来存储线程数据
    • 取得线程中的线果
    • 多线程与非多线程性能对比
    • 多线程与非多线程I/O操作
    • 线程池的使用

    一、用threading启动一个线程

    示例代码:

    #!/usr/bin/python3
    
    import time
    from threading import Thread
    
    def countdown(n):
        while n > 0:
            print('倒数开始:', n)
            n -= 1
            time.sleep(1)
    
    def main():
        t = Thread(target=countdown, args=(5, ))   # target=函数名,args=() 给target=的函数传参数,参数以元组形式传,如果只有一个,要加逗号
        t.start()                                  # 启动线程
    
    if __name__ == '__main__':
        main()

    输出:
    倒数开始: 5
    倒数开始: 4
    倒数开始: 3
    倒数开始: 2
    倒数开始: 1

    二、启动多个线程

    示例代码:

    #!/usr/bin/python3
    
    import time
    from threading import Thread
    
    def countdown(n, number):
        while n > 0:
            print(f'第{number}个线程,倒数开始:', n)
            n -= 1
            time.sleep(1)
    
    def main():
        for i in range(3):
            t = Thread(target=countdown, args=(5, i+1))
            t.start()
    
    if __name__ == '__main__':
        main()

    输出:
    第1个线程,倒数开始: 5
    第2个线程,倒数开始: 5
    第3个线程,倒数开始: 5
    第3个线程,倒数开始: 4
    第2个线程,倒数开始: 4
    第1个线程,倒数开始: 4
    第3个线程,倒数开始: 3
    第2个线程,倒数开始: 3
    第1个线程,倒数开始: 3
    第1个线程,倒数开始: 2
    第2个线程,倒数开始: 2
    第3个线程,倒数开始: 2
    第1个线程,倒数开始: 1
    第2个线程,倒数开始: 1
    第3个线程,倒数开始: 1

    三、获取线程名字

    示例代码:

    #!/usr/bin/python3
    import time
    import threading
    from threading import Thread
    
    def countdown(n, number):
        while n > 0:
            print(f'第{number}个线程,倒数开始:', threading.current_thread().name, n)
            n -= 1
            time.sleep(1)
    
    def main():
        for i in range(3):
            t = Thread(target=countdown, args=(5, i+1))
            t.start()
    
    if __name__ == '__main__':
        main()

    输出:
    第1个线程,倒数开始: Thread-1 5
    第2个线程,倒数开始: Thread-2 5
    第3个线程,倒数开始: Thread-3 5
    第1个线程,倒数开始: Thread-1 4
    第2个线程,倒数开始: Thread-2 4
    第3个线程,倒数开始: Thread-3 4
    第1个线程,倒数开始: Thread-1 3
    第2个线程,倒数开始: Thread-2 3
    第3个线程,倒数开始: Thread-3 3
    第3个线程,倒数开始: Thread-3 2
    第2个线程,倒数开始: Thread-2 2
    第1个线程,倒数开始: Thread-1 2
    第3个线程,倒数开始: Thread-3 1
    第2个线程,倒数开始: Thread-2 1
    第1个线程,倒数开始: Thread-1 1

    四、让线程按顺序执行

    示例代码:

    #!/usr/bin/python3
    # 当前线程结束后再执行下一个
    import time
    import threading
    from threading import Thread
    
    def countdown(n, number):
        while n > 0:
            print(f'第{number}个线程,倒数开始:', threading.current_thread().name, n)
            n -= 1
            time.sleep(1)
    
    def main():
        for i in range(3):
            t = Thread(target=countdown, args=(3, i+1))
            t.start()
            t.join()    # waite until the thread terminates 前面线程结束之后再执行下一个
    
    if __name__ == '__main__':
        main()

    输出:
    第1个线程,倒数开始: Thread-13 3
    第1个线程,倒数开始: Thread-13 2
    第1个线程,倒数开始: Thread-13 1
    第2个线程,倒数开始: Thread-14 3
    第2个线程,倒数开始: Thread-14 2
    第2个线程,倒数开始: Thread-14 1
    第3个线程,倒数开始: Thread-15 3
    第3个线程,倒数开始: Thread-15 2
    第3个线程,倒数开始: Thread-15 1

    五、给线程加上日志

    示例代码:

    #!/usr/bin/python3
    import time
    import logging
    import threading
    from threading import Thread
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(threadName)-10s: %(message)s',
    )   # format 中的 threadName 可以捕获到线程的名字,所以下边logging.debug()中不需要传入线程名
    
    def countdown(n, number):
        while n > 0:
            # print(f'{threading.current_thread().name}-倒数开始:', n)
            logging.debug(f'倒数开始:{n}')
            n -= 1
            time.sleep(1)
    
    def main():
        thread_list = []
        logging.debug('start.....')
        for i in range(3):
            t = Thread(target=countdown, args=(3, i+1))
            t.start()
            thread_list.append(t)  # 把线程放到列表中
    
        for i in thread_list:   # 终止列表中的线程
            i.join()
        logging.debug('end.....')
    if __name__ == '__main__':
        main()

    输出:
    MainThread: start.....
    Thread-1 : 倒数开始:3
    Thread-2 : 倒数开始:3
    Thread-3 : 倒数开始:3
    Thread-1 : 倒数开始:2
    Thread-3 : 倒数开始:2
    Thread-2 : 倒数开始:2
    Thread-2 : 倒数开始:1
    Thread-3 : 倒数开始:1
    Thread-1 : 倒数开始:1
    MainThread: end.....

    六、线程类的实现

    示例代码:

    #!/usr/bin/python3
    # 继承 Thread类,定义一个新类,初始化对象
    import time
    import logging
    import threading
    from threading import Thread
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(threadName)-10s: %(message)s',
    )   # format 中的 threadName 可以捕获到线程的名字,所以下边logging.debug()中不需要传入线程名
    
    def countdown(n):
        while n > 0:
            # print(f'{threading.current_thread().name}-倒数开始:', n)
            logging.debug(f'倒数开始:{n}')
            n -= 1
            time.sleep(1)
    
    class MyThread(Thread):
        def __init__(self, name, count):
            Thread.__init__(self)
            self.name = name
            self.count = count
    
        def run(self):
            countdown(self.count)
    
    def main():
        thread_list = []
        logging.debug('start.....')
        for i in range(3):
            t = MyThread(f'thread-{i+1}', 3)
            t.start()
            thread_list.append(t)  # 把线程放到列表中
        for i in thread_list:   # 终止列表中的线程
            i.join()
        logging.debug('end.....')
    if __name__ == '__main__':
        main()

    输出:
    MainThread: start.....
    thread-1 : 倒数开始:3
    thread-2 : 倒数开始:3
    thread-3 : 倒数开始:3
    thread-3 : 倒数开始:2
    thread-1 : 倒数开始:2
    thread-2 : 倒数开始:2
    thread-3 : 倒数开始:1
    thread-2 : 倒数开始:1
    thread-1 : 倒数开始:1
    MainThread: end.....

    七、线程锁

    示例代码:

     #!/usr/bin/python3
     # 给线程加锁
    
    import time
    import logging
    import threading
    from threading import Thread
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(threadName)-10s: %(message)s',
    )   # format 中的 threadName 可以捕获到线程的名字,所以下边logging.debug()中不需要传入线程名
    
    def countdown(n):
        while n > 0:
            logging.debug(f'倒数开始:{n}')
            n -= 1
            time.sleep(1)
    
    class MyThread(Thread):
        def __init__(self, name, count):
            Thread.__init__(self)
            self.name = name
            self.count = count
    
        def run(self):
            try:
                lock.acquire()     # 获取锁
                logging.debug('lock....')
                countdown(self.count)
            finally:
                lock.release()
                logging.debug('open again')
    
    lock = threading.Lock()   # 新建一个锁
    
    def main():
        thread_list = []
        logging.debug('start.....')
        for i in range(3):
            t = MyThread(f'thread-{i+1}', 3)
            t.start()
            thread_list.append(t)  # 把线程放到列表中
    
        for i in thread_list:   # 终止列表中的线程
            i.join()
        logging.debug('end.....')
    if __name__ == '__main__':
        main()

    输出:
    MainThread: start.....
    thread-1 : lock....
    thread-1 : 倒数开始:3
    thread-1 : 倒数开始:2
    thread-1 : 倒数开始:1
    thread-1 : open again
    thread-2 : lock....
    thread-2 : 倒数开始:3
    thread-2 : 倒数开始:2
    thread-2 : 倒数开始:1
    thread-2 : open again
    thread-3 : lock....
    thread-3 : 倒数开始:3
    thread-3 : 倒数开始:2
    thread-3 : 倒数开始:1
    thread-3 : open again
    MainThread: end.....

    八、多线程使用全局变量下锁的重要性

    示例代码:

    #!/usr/bin/python3
    # 防止多个线程同时操作同一个变量
    # 锁,多线程修改全局变量
    # 执行时,后边加线程个数;例如python xxx.py 5
    import time
    import logging
    import threading
    import random
    import sys
    from threading import Thread
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(threadName)-10s: %(message)s',
    )   # format 中的 threadName 可以捕获到线程的名字,所以下边logging.debug()中不需要传入线程名
    
    def countdown(n):
        while n > 0:
            # print(f'{threading.current_thread().name}-倒数开始:', n)
            logging.debug(f'倒数开始:{n}')
            n -= 1
            time.sleep(1)
    
    class MyThread(Thread):
        def __init__(self, name, count):
            Thread.__init__(self)
            self.name = name
            self.count = count
    
        def run(self):
            try:
                lock.acquire()     # 获取锁
                logging.debug('lock....')
                countdown(self.count)
            finally:
                lock.release()
                logging.debug('open again')
    
    lock = threading.Lock()   # 新建一个锁
    TOTAL = 0
    
    def add_plus():
        global TOTAL
        with lock:        # 锁的新用法,用完之后可以自动关闭
            logging.debug(f'before add:{TOTAL}')
            wait = random.randint(1, 3)
            time.sleep(wait)
            print(f'执行了{wait}s之后。。。')
            TOTAL += 1
            logging.debug(f'after add:{TOTAL}')
    
    def main():
        thread_list = []
        logging.debug('start.....')
        for i in range(int(sys.argv[1])):
            t = Thread(target=add_plus)
            t.start()
            thread_list.append(t)  # 把线程放到列表中
    
        for i in thread_list:   # 终止列表中的线程
            i.join()
        logging.debug('end.....')
    if __name__ == '__main__':
        main()

    输出:
    MainThread: start.....
    Thread-1 : before add:0
    执行了3s之后。。。
    Thread-1 : after add:1
    Thread-2 : before add:1
    执行了2s之后。。。
    Thread-2 : after add:2
    Thread-3 : before add:2
    执行了1s之后。。。
    Thread-3 : after add:3
    Thread-4 : before add:3
    执行了2s之后。。。
    Thread-4 : after add:4
    Thread-5 : before add:4
    执行了3s之后。。。
    Thread-5 : after add:5
    MainThread: end.....

    九、锁嵌套的问题

    常规情况下出现锁嵌套,程序会卡住
    这里用RLock
    示例代码:

    #!/usr/bin/python3
    # 出现锁嵌套时,要用threading.RLock建立锁,否则程序会出问题
    import time
    import logging
    import threading
    import random
    import sys
    from threading import Thread
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(threadName)-10s: %(message)s',
    )   # format 中的 threadName 可以捕获到线程的名字,所以下边logging.debug()中不需要传入线程名
    
    def countdown(n):
        while n > 0:
            logging.debug(f'倒数开始:{n}')
            n -= 1
            time.sleep(1)
    
    class MyThread(Thread):
        def __init__(self, name, count):
            Thread.__init__(self)
            self.name = name
            self.count = count
    
        def run(self):
            try:
                lock.acquire()     # 获取锁
                logging.debug('lock....')
                countdown(self.count)
            finally:
                lock.release()
                logging.debug('open again')
    
    # lock = threading.Lock()   # 新建一个锁
    lock = threading.RLock()   # 可以用于锁嵌套
    TOTAL = 0
    
    def add_plus_3():
        global TOTAL
        with lock:
            TOTAL += 3
    
    def add_plus():
        global TOTAL
        with lock:        # 锁的新用法,用完之后可以自动关闭
            logging.debug(f'before add:{TOTAL}')
            wait = random.randint(1, 3)
            time.sleep(wait)
            print(f'执行了{wait}s之后。。。')
            TOTAL += 1
            logging.debug(f'after add:{TOTAL}')
            add_plus_3()
    
    def main():
        thread_list = []
        logging.debug('start.....')
        for i in range(int(sys.argv[1])):
            t = Thread(target=add_plus)
            t.start()
            thread_list.append(t)  # 把线程放到列表中
        for i in thread_list:   # 终止列表中的线程
            i.join()
        logging.debug('end.....')
    if __name__ == '__main__':
        main()

    输出:
    MainThread: start.....
    Thread-1 : before add:0
    执行了3s之后。。。
    Thread-1 : after add:1
    Thread-2 : before add:4
    执行了3s之后。。。
    Thread-2 : after add:5
    Thread-3 : before add:8
    执行了3s之后。。。
    Thread-3 : after add:9
    Thread-4 : before add:12
    执行了2s之后。。。
    Thread-4 : after add:13
    Thread-5 : before add:16
    执行了3s之后。。。
    Thread-5 : after add:17
    MainThread: end.....

    十、使用队列来存储线程数据

    队列queue,先时先出的读写规则
    示例代码:

    #!/usr/bin/python3
    from queue import Queue
    q = Queue()
    q.put(1)
    q.put(2)
    q.put(3)
    q.get(block=False)  # 加上block=False防止取完所有之后卡住
    q.get(block=False)
    q.get(block=False)

    输出:
    1
    2
    3

    十一、取得线程中的线果

    示例代码:

    # 取得线程中的结果
    from queue import Queue    # 用队列来保存线程的结果,先进先出
    from threading import Thread
    
    q_result = Queue()  # 新建一个队列对象
    str_list = ['1', '3', '6', '8']
    
    def str_to_int(arg, queue):
        result = int(arg)         # 将列表中的字符串转换成数字
        queue.put({arg: result})
    
    def main():
        thread_list = []
        for s in str_list:
            t = Thread(target=str_to_int, args=(s, q_result))
            t.start()
            thread_list.append(t)
    
        for i in thread_list:
            i.join()
    
        return [q_result.get() for _ in range(len(str_list))]    # 列表生成式,等同于上边的for循环
    
    if __name__ == '__main__':
        print(main())   # 打印main()中的return 内容

    输出:
    [{'1': 1}, {'3': 3}, {'6': 6}, {'8': 8}]

    十二、多线程与非多线程性能对比

    测试多线程与非多线程性能
    示例代码:

    #!/usr/bin/python3
    # 多线程与非多线程时间对比
    # 测试多线程是否适合I/O密集型,用时间加减来检测多线程与非多线程
    import time
    from queue import Queue    # 用队列来保存线程的结果,先进先出
    from threading import Thread
    
    q_result = Queue()  # 新建一个队列对象
    str_list = ['1', '3', '6', '8']
    
    def str_to_int(arg, queue):
        result = int(arg)
        queue.put({arg: result})
    
    def with_thread():
        thread_list = []
        start_time = time.time()
        for s in str_list:
            t = Thread(target=str_to_int, args=(s, q_result))
            t.start()
            thread_list.append(t)
    
        for i in thread_list:
            i.join()
        print('with thread:', (time.time() - start_time) * 1000)   # 显示毫秒
        return [q_result.get() for _ in range(len(str_list))]
    
    def no_thread():
        start_time = time.time()
        q = Queue()
        for s in str_list:
            str_to_int(s, q)
    
        print('no thread:', (time.time() - start_time) * 1000)   #  显示毫秒
        return [q.get() for _ in range(len(str_list))]
    
    def main():
        no_thread()
        with_thread()
    
    if __name__ == '__main__':
        main() 

    输出:
    no thread: 0.0
    with thread: 1.996755599975586

    十三、多线程与非多线程I/O操作

    对比I/O操作时,多线程与非多线程的性能
    示例代码:

    # 用I/O操作来检测多线程与非多线程处理任务所花的时间
    # 测试多线程是否适合I/O密集型,用时间加减来检测多线程与非多线程
    #!/usr/bin/python3
    import time
    from queue import Queue    # 用队列来保存线程的结果,先进先出
    from threading import Thread
    import requests
    
    q_result = Queue()  # 新建一个队列对象
    urls = [
        "http://www.baidu.com",
        "http://www.qq.com",
        "http://www.360.com",
        "http://www.baidu.com",
        "http://www.qq.com",
        "http://www.360.com",
        "http://www.baidu.com",
        "http://www.qq.com",
        "http://www.360.com",
    ]
    
    def get_page(url, queue):
        result = requests.get(url).content   # 获取页面内容
        queue.put(result[:10])   # 保存前10个字符
        with open('utl.txt', 'ab') as f:
            f.write(result[:100])
    
    def with_thread():
        thread_list = []
        start_time = time.time()
        for s in urls:
            t = Thread(target=get_page, args=(s, q_result))
            t.start()
            thread_list.append(t)
    
        for i in thread_list:
            i.join()
        print('with thread:', (time.time() - start_time) * 1000)   # 显示毫秒
        return [q_result.get() for _ in range(len(urls))]
    
    def no_thread():
        start_time = time.time()
        q = Queue()
        for s in urls:
            get_page(s, q)
    
        print('no thread:', (time.time() - start_time) * 1000)   # 显示毫秒
        return [q.get() for _ in range(len(urls))]
    
    def main():
        print(no_thread())  # 打印return的内容
        print(with_thread())
    
    if __name__ == '__main__':
        main()

    输出:
    no thread: 1418.3428287506104
    [b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!Doctype ', b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!Doctype ', b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!Doctype ']
    with thread: 346.4798927307129
    [b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!DOCTYPE ', b'<!Doctype ', b'<!Doctype ', b'<!Doctype ']PS D:\python\project>

    十四、线程池的使用

    示例代码:

    #!/usr/bin/python3
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import logging
    import requests
    
    urls = [
        "http://www.baidu.com",
        "http://www.qq.com",
        "http://www.360.com",
        "http://www.baidu.com",
        "http://www.qq.com",
        "http://www.360.com",
        "http://www.baidu.com",
        "http://www.qq.com",
        "http://www.360.com",
    ]
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(threadName)-10s: %(message)s',
    )
    
    def download(url):
        r = requests.get(url)
        return url, r.status_code
    
    def main():
        with ThreadPoolExecutor(5, thread_name_prefix='yhyang') as executor:
            # 创建一个5个线程的池
            # 方法1:
            # futures = [executor.submit(download, url) for url in urls]
            # submit 返回一个future对象,futures值为一个包含多个future对象的列表
            # for future in as_completed(futures):
            #     # as_completed(futures) 得到一个可迭代的对象
            #     try:
            #         print(future.result())
            #     except Exception as e:
            #         print(e)
    
            # 方法2:map() 是对方法1中submit()的一个封装,简化了使用方法
            futures_result = executor.map(download, urls, timeout=30)
            for future in futures_result:
                try:
                    print(future)
                except Exception as e:
                    print(e)
    
    if __name__ == '__main__':
        main()

    输出:
    yhyang_0 : Starting new HTTP connection (1): www.baidu.com
    yhyang_1 : Starting new HTTP connection (1): www.qq.com
    yhyang_2 : Starting new HTTP connection (1): www.360.com
    yhyang_3 : Starting new HTTP connection (1): www.baidu.com
    yhyang_4 : Starting new HTTP connection (1): www.qq.comyhyang_0 : http://www.baidu.com:80 "GET / HTTP/1.1" 200 None
    ('http://www.baidu.com', 200)
    yhyang_0 : Starting new HTTP connection (1): www.360.com
    yhyang_1 : http://www.qq.com:80 "GET / HTTP/1.1" 200 None
    yhyang_3 : http://www.baidu.com:80 "GET / HTTP/1.1" 200 Noneyhyang_3 : Starting new HTTP connection (1): www.baidu.com
    yhyang_4 : http://www.qq.com:80 "GET / HTTP/1.1" 200 None
    ('http://www.qq.com', 200)
    yhyang_1 : Starting new HTTP connection (1): www.qq.com
    yhyang_3 : http://www.baidu.com:80 "GET / HTTP/1.1" 200 None
    yhyang_2 : http://www.360.com:80 "GET / HTTP/1.1" 301 178
    yhyang_3 : Starting new HTTP connection (1): www.360.com
    yhyang_2 : Starting new HTTPS connection (1): www.360.cn
    yhyang_0 : http://www.360.com:80 "GET / HTTP/1.1" 301 178
    yhyang_0 : Starting new HTTPS connection (1): www.360.cn
    yhyang_1 : http://www.qq.com:80 "GET / HTTP/1.1" 200 None
    yhyang_3 : http://www.360.com:80 "GET / HTTP/1.1" 301 178
    yhyang_3 : Starting new HTTPS connection (1): www.360.cn
    yhyang_0 : https://www.360.cn:443 "GET / HTTP/1.1" 200 None
    yhyang_3 : https://www.360.cn:443 "GET / HTTP/1.1" 200 None
    yhyang_2 : https://www.360.cn:443 "GET / HTTP/1.1" 200 None
    ('http://www.360.com', 200)
    ('http://www.baidu.com', 200)
    ('http://www.qq.com', 200)
    ('http://www.360.com', 200)
    ('http://www.baidu.com', 200)
    ('http://www.qq.com', 200)
    ('http://www.360.com', 200)

关键字

上一篇: python3的实例方法

下一篇: 源码安装 python3