用 uWSGI 来跑 asyncio

发布时间:2019-09-25 08:13:24编辑:auto阅读(1685)

    最近关注的有点杂,所以也挺久没更新博客了。这一篇主要讨论这些技术:WSGI、py3k、werkzeugasynciouWSGInginx

    WSGI

    先从最简单的开始说—— WSGI。根据定义,这是一种 web 服务器接口规范,源自 Python,后被其他语言借用而成为一个较为通用的接口。以 Python 为例,这个接口非常的简单:

    def application(environ, start_response):
        ...
    

    也就是说,WSGI 接口接受两个参数,一个是包含了请求内容等信息的字典 environ,另外就是一个用以开启 HTTP 响应的 Python 函数对象;而接口返回的内容则是 HTTP 响应的内容。这样,你好世界版的样例程序就是这样的了:

    def application(environ, start_response):
        start_response('200 OK', [('Content-Type', 'text/plain')])
        yield "Hello world!\n"
    

    稍微复杂一点的例子就是 Django 了。通过调用 get_wsgi_application(),您可以得到一个 WSGI 接口函数的实现,通过它您可以访问整个 Django 站点。

    WSGI 接口及其实现我们就介绍完了。那,谁来调用这个函数呢?WSGI 的调用者叫做 WSGI 容器。

    WSGI 容器有很多不同的实现,但他们有着共同的特点:

    1. 监听一个 HTTP 端口,提供 web 服务
    2. 内部会把前端的 HTTP 请求包装成对 WSGI 接口的调用
    3. 可以通过设置来指定 WSGI 接口的实现

    说白了,WSGI 容器就是 web 服务器,只不过能够调用指定的 Python 函数来提供服务罢了。比如说,加装了 mod_wsgi 模块的 Apache HTTP 服务器 就是一种 WSGI 容器,经过合理的配置,我们就可以在 Apache 的进程里用上述 Python 代码来提供你好世界的服务。除了 Apache,geventTornadogunicorncherrypy 这些 Python web 服务器也都是 WSGI 容器。

    于是呢,我们就能从实践中看到接口设计的解耦合作用:人们经常说,gunicorn + Django,其实就是 gunicorn 通过 WSGI 接口整合了 Django 的应用。这时,Django 负责业务逻辑渲染页面,gunicorn 负责解析 HTTP 协议然后调用 Django 的 WSGI 接口。

    WSGIuWSGIuwsgi

    uWSGI 是一种 WSGI 容器,但它并不直接提供基于 HTTP 协议的 web 服务,而是通过一种叫做 uwsgi 的协议来提供 web 服务——没错,只是大小写的区别,以至于它的作者也承认这是一个很糟糕的名字。所以通常情况下,如果您选择使用 uWSGI 来部署您的 WSGI 应用,您还需选择一款支持 uwsgi 协议的 web 服务器——比如 nginx

    就以 nginx 为例,您总共需要配置执行两个服务器程序:nginx 和 uWSGI。nginx 的配置很简单,通过 uwsgi_pass 指令将请求交给后端的 uwsgi 服务器来处理就好了:

            location / {
                root html;
                uwsgi_pass unix:///tmp/uwsgi.sock;
                include uwsgi_params;
            }
    

    而执行 uWSGI 也非常简单,告诉它应该监听的地址,以及应该调用的 WSGI 实现就好了:

    $ uwsgi -s /tmp/uwsgi.sock -w myweb.myapp
    

    这里的 myweb.myapp 应该是一个可以 import 的 Python 模块,其中应该有一个叫做 application 的函数,这样 uWSGI 就可以调用这个函数来提供 uwsgi 服务了。

    不同于其他的 WSGI 容器,uWSGI 是在独立的进程中运行的,不受 web 服务器的影响和限制,所以有较大空间可以灵活配置,比如说可以配置同步还是异步啦、多少个进程或线程啦等等,甚至可以选择主循环引擎、异步切换引擎——比如说 asyncio 的主循环引擎和基于 greenlet 的异步切换引擎。

    uWSGI 和 asyncio

    uWSGI 从 2.0.4 开始,实验性地支持 asyncio,也就是说,uWSGI 可以启动一个 asyncio 的主循环,然后在它里面(通过 call_later)来调用 WSGI 接口的 application 函数。这样,我们就可以实现异步……了么?

    还不行。

    因为 uWSGI 没有一个基于回调函数的设计,所以如果我们无法使 application 函数立即返回最终结果,而是返回一个 Future 对象的话,uWSGI 是拿它没有办法的,请求就永远无法得到响应了。

    解决办法就是使用 uWSGI 提供的异步切换引擎,比如说 greenlet —— uWSGI 会在单独的微线程中来执行每一个 application 函数。我们先引用官方的一个例子来看看吧:

    import asyncio
    import greenlet
    
    def two_seconds_elapsed(me):
        print("Hello 2 seconds elapsed")
        # back to WSGI  callable
        me.switch()
    
    def application(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        myself = greenlet.getcurrent()
        asyncio.get_event_loop().call_later(2, two_seconds_elapsed, myself)
        # back to event loop
        myself.parent.switch()
        return [b"Hello World"]
    

    小程序很简单,对于任何请求,我们希望在 2 秒钟的异步等待之后返回你好世界。

    做法很简单,在启动了异步调用(call_later(2, ...))之后,当我们需要等待异步结果的时候,通过 greenlet 将执行权交出去,切换至主循环所在的微线程去,也就是 myself.parent.switch()。这样在这 2 秒钟的异步等待中,别的微线程就可以有机会被执行到。等到 2 秒钟过去了,主循环会去调用 two_seconds_elapsed(),通过 me.switch(),我们又回到了最初的 application() 调用中,myself.parent.switch() 的调用在这时返回结果,继而返回响应你好世界。

    咦,greenlet 代表的不是隐式的异步切换么?怎么这里跟显式的 asyncio 混在了一起呢?为什么不直接用 asyncio 自己的异步切换方式——coroutine 呢?

    因为 WSGI 的标准——我认为——不支持显式的异步切换,uWSGI 官方给出的解释是asyncio 的 coroutine 会把 WSGI 破坏的一塌糊涂……我都不想提了。虽然也有人提出了异步的 WSGI 设计,但终究无法成为主流。其实,这里使用 greenlet 并不影响我们实现一个纯粹的 asyncio 异步 WSGI 服务器,只要写一个简单的函数包装一下就好了。

    Werkzeug

    在桥接 uWSGI 之前呢,请允许我先介绍一下 Werkzeug

    Werkzeug 是一个工具集,专门用于在 Python 中搭建各种 WSGI 服务器。以 WSGI 的接口 application(environ, start_response) 为起点,Werkzeug 提供了请求/响应的封装、请求的分发、基本的 session 和 cookie 的使用等多种绿色实用功能,小巧方便,经久耐用,令人爱不释手,是搭建各种 web 框架的必备工具箱。用一个例子来说明:

    from werkzeug.wrappers import Request, Response
    
    def application(environ, start_response):
        request = Request(environ)
        response = Response("Hello %s!" % request.args.get('name', 'World!'))
        return response(environ, start_response)
    

    为了让我们的实例程序尽可能的简单,我们这里就只用 Werkzeug 来写一个简单的 web 服务器。基于前面这个简单到不能再简单的例子呢,我们增加一点功能——请求分发功能——总不能在一个函数里面处理所有的请求吧。

    分发请求呢,就得有分发规则。我们的分发规则长这样:

    from werkzeug.routing import Map, Rule
    
    url_map = Map([
        Rule('/', endpoint=IndexResource),
        Rule('/user', endpoint=UserResource),
    ])
    

    就是所有到 / 的请求由 IndexResource 来处理,/user 则由 UserResource 来处理。比如说:

    from werkzeug.wrappers import Response
    
    class IndexResource:
        def get(self, request):
            return Response("Hello world!")
    

    为了实现这个功能,我们需要改一下我们的 application 函数了:

    def application(environ, start_response):
        urls = url_map.bind_to_environ(environ)
        try:
            endpoint, args = urls.match()
        except HTTPException as e:
            return e(environ, start_response)
        else:
            request = Request(environ)
            method = getattr(endpoint(), request.method.lower(), None)
            if not method:
                return NotFound('not implemented')(environ, start_response)
    
            resp = method(request)
            return resp(environ, start_response)
    

    简单来说呢,就是从 url_map 里匹配一个 Resource 出来,建一个对象,然后调用其 get 或者 post 函数,然后将其返回的 Response 对象作为响应返回。

    放在一起

    最后,我们需要把所有的东西放在一起。目的呢,是为了能让 Resource 变成异步的,像这样:

    import asyncio
    from werkzeug.wrappers import Response
    
    class IndexResource:
        @asyncio.coroutine
        def get(self, request):
            yield from asyncio.sleep(1)
            return Response("Hello world!")
    

    如果是这样的话,前面代码里面的这两句就要改了:

            resp = method(request)
            return resp(environ, start_response)
    

    因为 resp 对象可能是个 coroutine 对象,需要异步等待之后才能得到 Response 对象。因此,配合前面关于 greenlet 的经验,我们加上一个 if

            resp = method(request)
            if isinstance(resp, asyncio.Future) or inspect.isgenerator(resp):
                myself = greenlet.getcurrent()
                future = asyncio.Future()
                asyncio.Task(_wrapper(myself, future, resp))
                myself.parent.switch()
                resp = future.result()
            return resp(environ, start_response)
    

    这里用到的 _wrapper 函数定义如下:

    @asyncio.coroutine
    def _wrapper(me, future, coro):
        try:
            resp = yield from coro
        except Exception as e:
            future.set_exception(e)
        else:
            future.set_result(resp)
        finally:
            me.switch()
    

    大功告成!解释依时间顺序如下:

    1. resp 是个 coroutine?没问题,我们放在另一个 coroutine 里(_wrapper)把它 yield from 了就好了
    2. 结果怎么取回来?用 asyncio.Future 对象搞定!创建一个,备用
    3. _wrapper 必须得立即执行,所以用 asyncio.Task 包一下,跑起
    4. 异步切换!asyncio.Task 保证了主循环会尽快调用 _wrapper
    5. _wrapper 里,我们会把异步调用 Resource.getpost 的最终结果设置到 future 对象中
    6. 然后切换回原来的微线程
    7. 这时,我们就可以通过 future.result() 来得到最终结果——或者将底层的异常重新抛出

    最后再把完整的 uwsgi.py 文件贴一下:

    import asyncio
    import greenlet
    import inspect
    from werkzeug.exceptions import HTTPException
    from werkzeug.exceptions import NotFound
    from werkzeug.routing import Map, Rule
    from werkzeug.wrappers import Request
    # from ...... import IndexResource, UserResource
    
    
    url_map = Map([
        Rule('/', endpoint=IndexResource),
        Rule('/user', endpoint=UserResource),
    ])
    
    
    @asyncio.coroutine
    def _wrapper(me, future, coro):
        try:
            resp = yield from coro
        except Exception as e:
            future.set_exception(e)
        else:
            future.set_result(resp)
        finally:
            me.switch()
    
    
    def application(environ, start_response):
        urls = url_map.bind_to_environ(environ)
        try:
            endpoint, args = urls.match()
        except HTTPException as e:
            return e(environ, start_response)
        else:
            request = Request(environ)
            method = getattr(endpoint(), request.method.lower(), None)
            if not method:
                return NotFound('not implemented')(environ, start_response)
    
            # noinspection PyCallingNonCallable
            resp = method(request)
            if isinstance(resp, asyncio.Future) or inspect.isgenerator(resp):
                myself = greenlet.getcurrent()
                future = asyncio.Future()
                asyncio.Task(_wrapper(myself, future, resp))
                myself.parent.switch()
                resp = future.result()
            return resp(environ, start_response)
    

    编译运行 uWSGI

    哈哈,把最基本的一项留在了最后。默认的 uWSGI 貌似并不包含 asynciogreenlet 的支持,所以我们得亲自编译一份。通常呢,使用 virtualenv 是一个好的习惯:

    $ source $PATH_TO_YOUR_VENV/bin/activate
    (venv) $ pip install greenlet
    ....
    (venv) $ CFLAGS="-I$PATH_TO_YOUR_VENV/include/python3.4m/" UWSGI_PROFILE="asyncio" pip install uwsgi
    ....
    (venv) $ uwsgi --asyncio 512 -s /tmp/uwsgi.sock --greenlet -w your_package.uwsgi
    

    其中呢,CFLAGS 指定 greenlet 的头文件目前是必须的(路径可能有出入),除非你的 greenlet 安装在系统库中。如果你用的是 Python 3.3,还需 pip install asyncio

    最后一句 uwsgi 的调用,指定了 --asyncio 作为主循环引擎,开 512 个微线程(在一个操作系统线程里)来处理请求(所以最大并发量是 512),然后指定了 --greenlet 作为异步切换引擎。其它参数前面已经说过了,最后那个 uwsgi 就是前述 uwsgi.py 的模块全名。

    关于 wsgi_input 的问题

    WSGI 接口中提供了这么一个参数:environ['wsgi.input'],或者如果用 Werkzeug 就是 request.input_stream(通常被包装成 request.stream 来用),它是一个包含了请求主体内容的类文件对象。在前述的服务器组合中,这个对象自然是 uWSGI 服务器来提供了。那么在异步的环境中,它的 read() 函数会不会阻塞主线程呢?它又能不能跟 asyncio 实现完美的配合呢?

关键字