Python数据抓取——多线程,异步

发布时间:2019-08-13 07:36:36编辑:auto阅读(1173)

    本文主要是为了加快数据抓取任务,考虑使用多进程、多线程、异步原理,相关概念可以参考
    https://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/0013868322563729e03f6905ea94f0195528e3647887415000

    操作系统可以同时运行多个任务。首先,考虑单核CPU是如何执行多任务的:操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度非常快,给人的感觉就像所有任务都在同时执行一样。真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行

    对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。由于每个进程至少要干一件事,所以一个进程至少有一个线程多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样,真正能同时执行多线程需要多核CPU才可能实现

    我们前面编写的所有的Python程序,都是执行单任务的进程,也就是只有一个线程。如果要同时执行多个任务有3种方案:一种是启动多个进程,每个进程只开一个线程,但多个进程可以一块执行多个任务。还有一种方法是启动一个进程,在一个进程内启动多个线程,多个线程也可以一块执行多个任务。第三种方法,就是启动多个进程,每个进程再启动多个线程,这样同时执行的任务就更多了,这种模型很复杂,实际很少采用。多任务的实现有3种方式:多进程模式;多线程模式;多进程+多线程模式。同时执行多个任务通常各个任务之间需要相互通信和协调,有时,任务1必须暂停等待任务2完成后才能继续执行,有时,任务3和任务4又不能同时执行,所以,多进程和多线程的程序的复杂度要远远高于我们前面写的单进程单线程的程序。因为复杂度高,调试困难,所以,不是迫不得已,我们也不想编写多任务。但是,有很多时候,没有多任务还真不行。想想在电脑上看电影,就必须由一个线程播放视频,另一个线程播放音频,否则,单线程实现的话就只能先把视频播放完再播放音频,或者先把音频播放完再播放视频,这显然是不行的。

    Python既支持多进程,又支持多线程。多任务可以由多进程完成,也可以由一个进程内的多线程完成。进程是由若干线程组成的,一个进程至少有一个线程。由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix Thread,而不是模拟出来的线程。Python的标准库提供了两个模块:thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行。

    import requests
    import threading
    
    def get_stock(code):
        url = 'http://hq.sinajs.cn/list=' + code
        resp = requests.get(url)
        print('%s\n' % resp.text)
    
    #多线程异步,加速抓取
    #根据有几个股票代码,就创建几个线程
    codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
    threads = [threading.Thread(target=get_stock, args=(code, )) for code in codes]
    #Thread创建线程实例
    '''
    threads=[ ]
    for code in codes:
        thread=threading.Thread(target=get_stock,args=(code, ))
        threads.append(thread)
    '''
    for t in threads:
        t.start()  #启动一个线程
    for t in threads:
        t.join()  #等待每个线程执行结束

    这里写图片描述

    多任务用线程池自动调度

    import requests
    import threadpool  #线程池
    
    def get_stock(code):
        url = 'http://hq.sinajs.cn/list=' + code
        resp = requests.get(url)
        print('%s\n' % resp.text)
    
    codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
    #codes里任务很多,比如几百个,让pool自己去调度
    pool = threadpool.ThreadPool(2) #线程池设置,最多同时跑两个线程
    tasks = threadpool.makeRequests(get_stock, codes)
    #makeRequests构造线程task请求,第一个参数是线程函数,第二个是参数数组
    [pool.putRequest(task) for task in tasks]
    #列表推导式,putRequest向线程池里加task,让pool自己去调度task
    pool.wait() #等所有任务结束

    这里写图片描述

    异步
    交出当前CPU的控制权,最大化利用当前单个CPU的效率

    import aiohttp #表示http请求是异步方式去请求的
    import asyncio #当异步请求返回时,通知异步操作完成
    
    #异步可以参考grequests库的使用:https://github.com/kennethreitz/grequests
    async def get_stock(code):
    #关键字async表示请求是异步的
        url = 'http://hq.sinajs.cn/list=' + code
        resp = await aiohttp.request('GET', url) # yield
        #await表示任务等待时,不占用CPU资源,通知请求返回
        body = await resp.read()
        #表示从网络上把请求的东西都读回来
        text = body.decode('gb2312') #对读回来的原始字节解码
        print(text)
        resp.close()
    
    codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
    tasks = [get_stock(code) for code in codes]
    #由于是异步请求,这里get_stock(code)并不会被马上执行,只是占用了一个位置
    
    loop = asyncio.get_event_loop()  #loop的作用是——做完任务,事件通知
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    #tasks生成一组并发的异步任务,loop表示异步作用完成后等待通知

关键字