Python自动化开发学习-爬虫3(性能

发布时间:2019-09-20 07:27:16编辑:auto阅读(1204)

    爬取多个网页

    讲师的博客:https://www.cnblogs.com/wupeiqi/p/6229292.html
    在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。
    比如找10个国外的资源爬取,慢的效果就很明显。

    串行的方式

    直接用一个for循环,把所有的请求串起来一次执行。这样的效率会很低:

    import requests
    from bs4 import BeautifulSoup
    
    url_list = [
        'https://github.com/explore',
        'https://www.djangoproject.com/',
        'http://www.python-requests.org/en/master/',
        'https://jquery.com/',
        'https://getbootstrap.com/',
        'https://www.solarwinds.com/',
        'https://www.zabbix.com/',
        'http://open-falcon.org/',
        'https://www.python.org/',
        'http://www.jetbrains.com/',
    ]
    
    if __name__ == '__main__':
        for url in url_list:
            r = requests.get(url)
            r.encoding = 'utf-8'
            soup = BeautifulSoup(r.text, features='html.parser')
            title = soup.find('title')
            print(title)

    多线程(多进程)

    下面是使用线程池(进程池)实现的方式。这里多进程和多线程的效果一样,但是线程更省资源。

    import requests
    from bs4 import BeautifulSoup
    from concurrent.futures import ThreadPoolExecutor
    # from concurrent.futures import ProcessPoolExecutor  # 进程池
    
    url_list = [
        'https://github.com/explore',
        # 省略多个url
        'http://www.jetbrains.com/',
    ]
    
    def fetch_request(url):
        r = requests.get(url)
        r.encoding = 'utf-8'
        soup = BeautifulSoup(r.text, features='html.parser')
        title = soup.find('title')
        print(title)
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(10)
            # pool = ProcessPoolExecutor(10)  # 进程池
        for url in url_list:
            pool.submit(fetch_request, url)
        pool.shutdown(True)

    多线程 + 回调函数

    上面的例子用到的模块,还支持使用回调函数,把代码稍稍改一下:

    import requests
    from bs4 import BeautifulSoup
    from concurrent.futures import ProcessPoolExecutor
    
    url_list = [
        'https://github.com/explore',
        # 省略多个url
        'http://www.jetbrains.com/',
    ]
    
    def fetch_request(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        soup = BeautifulSoup(response.text, features='html.parser')
        title = soup.find('title')
        return str(title)  # 这里返回的,就是下面回调函数的入参。不转str会报错
    
    def callback(result):
        print(result.result())
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(10)
        for url in url_list:
            v = pool.submit(fetch_request, url)
            v.add_done_callback(callback)
        pool.shutdown(True)

    多进程和多线程的回调函数用法也是一样的。
    这里简单的需求,是不需要用到回调函数。不过作为线程池的一个用法,多一个示例。

    异步IO

    多线程和多进程的缺点是在IO阻塞时会造成了线程和进程的浪费,所以异步IO是更好的方式。
    异步IO请求的本质则是非阻塞Socket + IO多路复用。这里只需要一个线程,而每一个请求则是一个协程
    下面就是各种Python内置以及第三方提供的异步IO请求模块。这些模块,使用简便,大大提高效率。

    asyncio 模块

    这个是内置模块
    先看下模块是怎么调用的。这里是python3.4版本的用法,到3.5版本有新的 async/await 关键字可以用。不过向下兼容,旧的装饰器的 asyncio/yield from 的用法还是可以使用的。
    用法示例:

    import asyncio
    
    @asyncio.coroutine
    def func(n):
        print('before func %s...' % n)
        yield from asyncio.sleep(3)
        print('end func %s...' % n)
    
    if __name__ == '__main__':
        tasks = []
        for i in range(5):
            tasks.append(func(i))
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()

    这里注意一下装饰器和被它装饰的函数。在tasks.append()里,添加的是函数的调用,添加的是func()而不是func,带括号的。所以一般情况下是要执行这个函数。当然这里给函数加了装饰器,就不会直接执行了,而是等到下面在执行的。

    通过TCP发送HTTP请求
    asyncio模块只提供了发送tcp的功能,无法直接发送http请求。不过在理解了Web服务的本质的基础上,http本质上还是tcp。http请求还是通过tcp发送字符串,只是字符串有特定的格式。字符串分为请求头和请求体,请求头和请求体之间使用 "/r/n/r/n" 分隔,而请求头和请求头之间使用 "/r/n" 分隔。下面就是一个基本的GET请求的格式:

    """
    GET /index HTTP/1.0\r\n
    HOST: 1.1.1.1
    \r\n\r\n
    """

    所以只要按上面的方式对字符串进行封装,然后通过tcp发送,这就是http了。下面这个就是用 asyncio 手动封装http报头的示例:

    import asyncio
    from bs4 import BeautifulSoup
    
    url_list = [
        ('www.python-requests.org', '/en/master/'),
        ('open-falcon.org', '/'),
        ('www.jetbrains.com', '/'),
        ('www.nga.cn', '/'),
        ('edu.51cto.com', '/'),
    ]
    
    @asyncio.coroutine
    def fetch_async(host, url):
        reader, writer = yield from asyncio.open_connection(host, 80)  # 建立tcp连接
        request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host)  # 这个是GET请求
        request_header_content = request_header_content.encode('utf-8')  # 最终发送的是bytes类型
        writer.write(request_header_content)  # 发出请求
        yield from writer.drain()
        text = yield from reader.read()  # 接收到的当然也是bytes类型
        text = text.decode('utf-8')
        soup = BeautifulSoup(text, features='html.parser')
        title = soup.find('title')
        print(title)
        writer.close()
    
    if __name__ == '__main__':
        tasks = []
        for host, url in url_list:
            tasks.append(fetch_async(host, url))
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()

    通过TCP发送HTTPS
    上面这样只能发http请求。https主要是2个差别,默认的端口号是443,还有就是需要ssl。好在 asyncio.open_connection 是提供支持ssl的,只需要加上ssl=True的参数(这个参数的默认是False,所以上面不用指定)。下面是支持https的版本:

    import asyncio
    from bs4 import BeautifulSoup
    
    url_list = [
        'https://github.com/explore',
        # 省略多个url
        'http://www.jetbrains.com/',
    ]
    
    @asyncio.coroutine
    def fetch_async(host, url='/', port=80, ssl=False):
        reader, writer = yield from asyncio.open_connection(host, port, ssl=ssl)  # 建立tcp连接
        request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host)  # 这个是GET请求
        request_header_content = request_header_content.encode('utf-8')  # 最终发送的是bytes类型
        writer.write(request_header_content)  # 发出请求
        yield from writer.drain()
        text = yield from reader.read()  # 接收到的当然也是bytes类型
        text = text.decode('utf-8')
        soup = BeautifulSoup(text, features='html.parser')
        title = soup.find('title')
        print(title)
        writer.close()
    
    if __name__ == '__main__':
        from urllib.parse import urlparse
        tasks = []
        for url in url_list:
            url_parse = urlparse(url)
            if url_parse.scheme == "https":
                tasks.append(fetch_async(url_parse.netloc, url_parse.path, 443, True))
            else:
                tasks.append(fetch_async(url_parse.netloc, url_parse.path))
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()

    asyncio + aiohttp

    讲师博客里的代码是版本的问题,运行不了会报错。因为从 python3.5 开始,引入了 async/await 。大概记录一下原因:

    在Python3.5以后,原生协程不能用于迭代,未被装饰的生成器不能yield from一个原生协程

    什么是原生协程?用async关键字定义的就是原生线程。asyncio是Python 3.4版本引入的标准库,是用装饰器的方式来定义协程的(上面的例子就是)。到了python3.5版本,引入了async关键字来定义协程,并且向下兼容,之前的装饰器的方法也能用。
    再来看一下aiohttp模块。粗略的看一下源码,旧版本(2.x及之前),用的是 asyncio/yield from 。3.x版本开始,都改用 async/await 了。旧版的 yield from 是不能调用新版的用async关键字定义的原生协程的,所以会报错。
    之前的例子用的是 asyncio/yield from ,但是这里的 aishttp 用的是 async/await ,所以无法再用 yield from 了。下面是用 async/await 的例子:

    import aiohttp
    import asyncio
    from bs4 import BeautifulSoup
    
    url_list = [
        'https://github.com/explore',
        # 省略多个url
        'http://www.jetbrains.com/',
    ]
    
    async def fetch_async(url):
        async with aiohttp.request('GET', url) as r:
            text = await r.text('utf-8')
            soup = BeautifulSoup(text, features='html.parser')
            title = soup.find('title')
            print(title)
    
    if __name__ == '__main__':
        tasks = []
        for url in url_list:
            tasks.append(fetch_async(url))
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()

    后面的例子还会继续用到 asyncio/yield from ,而且这个例子也不好找。
    不过 async/await 才是推荐的用法,好在改一下也不难,而且网上例子也多。

    asyncio + requests

    import asyncio
    import requests
    from bs4 import BeautifulSoup
    
    url_list = [
        'https://github.com/explore',
        # 省略多个url
        'http://www.jetbrains.com/',
    ]
    
    @asyncio.coroutine
    def fetch_async(func, *args):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, func, *args)
        response = yield from future
        response.encoding = 'utf-8'
        soup = BeautifulSoup(response.text, features='html.parser')
        title = soup.find('title')
        print(title)
    
    if __name__ == '__main__':
        tasks = []
        for url in url_list:
            tasks.append(fetch_async(requests.get, url))
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()

    gevent + requests

    from bs4 import BeautifulSoup
    import gevent
    from gevent import monkey
    monkey.patch_all()  # 必须放在requests模块导入前
    import requests
    
    url_list = [
        'https://github.com/explore',
            # 省略多个url
        'http://www.jetbrains.com/',
    ]
    
    def fetch_request(url):
        r = requests.get(url)
        r.encoding = 'utf-8'
        soup = BeautifulSoup(r.text, features='html.parser')
        title = soup.find('title')
        print(title)
    
    if __name__ == '__main__':
        g_list = []
        for url in url_list:
            g_list.append(gevent.spawn(fetch_request, url=url))
        gevent.joinall(g_list)

    grequests

    grequests 模块,就是 gevent + requests 。有人用代码又把这两个模块再封装了一层。就写个例子:

    import grequests
    from bs4 import BeautifulSoup
    
    url_list = [
        'https://github.com/explore',
        # 省略多个url
        'http://www.jetbrains.com/',
    ]
    
    def exception_handler(request, exception):
        print(request, exception)
        print("Request failed")
    
    def callback(r, *args, **kwargs):
        r.encoding = 'utf-8'
        soup = BeautifulSoup(r.text, features='html.parser')
        title = soup.find('title')
        print(title)
    
    if __name__ == '__main__':
        request_list = [grequests.get(url, timeout=10, callback=callback) for url in url_list]
        response_list = grequests.map(request_list, exception_handler=exception_handler, gtimeout=10)
        print(response_list)

    之前用for循环写列表太Low了,这里用列表生成式的写法。grequests.get里的timeout是单个任务的超时时间,grequests.map里的gtimeout则是整体任务的超时时间。
    exception_handler方法是请求有异常时的处理方法。如果单个任务超时,就会抛出异常,如果任务整体超时,则还没有结束的任务返回None,没有异常。

    Twisted

    直接安装模块会报错,去官网翻了一下 http://twistedmatrix.com 。找到了pip的安装方法

    The recommended way is to run pip install Twisted, preferably inside a virtualenv.
    On Linux, and BSDs, you will need a C compiler (such as GCC).
    On macOS you will need to run xcode-select --install.
    If you are installing on Windows, pip install Twisted[windows_platform] will install the Windows-specific requirements.

    所以应该用下面的命令,安装windwos用的版本:

    pip install -i https://mirrors.163.com/pypi/simple  Twisted[windows_platform]

    但是还是不行,错误信息如下:

        error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools

    Twisted 模块安装
    最终在网上找到解决方法,就是本地安装。先去下载这个模块对应版本的whl文件:
    https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted
    然后用 pip 执行本地安装:

    pip install E:/Downloads/Twisted-18.9.0-cp36-cp36m-win_amd64.whl

    发GET请求

    from bs4 import BeautifulSoup
    from twisted.web.client import getPage, defer
    from twisted.internet import reactor
    
    url_list = [
        'https://github.com/explore',
        # 略多个url
        'http://www.jetbrains.com/',
    ]
    
    def all_done(arg):
        reactor.stop()
    
    def callback(contents):
        soup = BeautifulSoup(contents, features='html.parser')
        title = soup.find('title')
        print(title)
    
    if __name__ == '__main__':
        deferred_list = []
        for url in url_list:
            deferred = getPage(url.encode('utf-8'))  # 发请求
            deferred.addCallback(callback)  # 请求返回后的回调函数
            deferred_list.append(deferred)  # 把所有的请求加到列表里,后面要检测
        dlist = defer.DeferredList(deferred_list)  # 检测所有的请求
        dlist.addBoth(all_done)  # 检测到所有请求都执行完,执行的方法
        reactor.run()  # 开启一个死循环,不停的执行,all_done函数里的stop()方法会停止这个循环

    发POST请求

    from twisted.internet import reactor
    from twisted.web.client import getPage
    import urllib.parse
    
    def one_done(arg):
        print(arg)
        print(arg.decode())
        reactor.stop()
    
    post_data = urllib.parse.urlencode({'check_data': 'TEST'})
    post_data = post_data.encode('utf-8')
    headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
    response = getPage(b'http://dig.chouti.com/login',
                       method=b'POST',
                       postdata=post_data,
                       cookies={},
                       headers=headers)
    response.addBoth(one_done)
    
    reactor.run()

    tornado

    这里只有个例子,之后可能还要再学一下:

    from bs4 import BeautifulSoup
    from tornado.httpclient import AsyncHTTPClient
    from tornado.httpclient import HTTPRequest
    from tornado import ioloop
    
    url_list = [
        'https://github.com/explore',
        'https://www.djangoproject.com/',
        'http://www.python-requests.org/en/master/',
        'https://jquery.com/',
        'https://getbootstrap.com/',
        'https://www.solarwinds.com/',
        'https://www.zabbix.com/',
        'http://open-falcon.org/',
        'https://www.python.org/',
        'http://www.jetbrains.com/',
    ]
    
    def asynchronous_fetch():
        http_client = AsyncHTTPClient()
    
        # 创建一个函数内的函数,来处理返回的结果
        def handle_response(response):
            """
            处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
            :param response:
            :return:
            """
            if response.error:
                print("Error:", response.error)
            else:
                # print(response.headers)
                # print(response.body)
                soup = BeautifulSoup(response.body, features='html.parser')
                title = soup.find('title')
                print(title)
            # 自己加的停止的方法,实现方法可能不是很正规
            # print(response.effective_url)
            curr_url = response.effective_url
            if curr_url in url_list:
                url_list.remove(curr_url)
            if not url_list:
                ioloop.IOLoop.current().stop()
    
        for url in url_list:
            # 异步处理结束后会调用指定的callback的函数
            http_client.fetch(HTTPRequest(url), callback=handle_response)
            # 下面这句和上面效果一样,模块内部会判断参数的isinstance是否是HTTPRequest
            # 如果不是则,HTTPRequest(url, **kwargs)
            # 这里的**kwargs,就是如果要给请求加任何参数,就用关键参数传参
            # http_client.fetch(url, callback=handle_response)
    
    if __name__ == '__main__':
        ioloop.IOLoop.current().add_callback(asynchronous_fetch)
        ioloop.IOLoop.current().start()

关键字

上一篇: resin3安装及使用

下一篇: Hadoop3.x新特性