python IO多路复用之select

发布时间:2019-09-18 07:22:34编辑:auto阅读(1606)

        说起IO操作我们最先想到的就是读写文件。其实python中对有三种IO操作,打开文件,使用socket进行网络连接和系统的标准输入输出sys.stdin和sys.stdout。我们先来看一段socket服务端的代码:

    import socket
    ip_port = ('127.0.0.1',9999)
    sk = socket.socket()
    sk.bind(ip_port)
    sk.listen(5)
    while True:
        """
        程序运行到accept()就开始阻塞,直到有客户端的连接
        """
        conn,addr = sk.accept()
        print addr
        Flag=True
        while Flag:
            """
            运行到recv()的时候也会阻塞,等待收到客户端的输入之后
            才会继续运行
            """
            client_data = conn.recv(1024)
            #将客户端的输入发还给客户端
            conn.sendall(client_data)

    上面的代码是个简单的服务端,它的功能就是将接受到的客户端的发来的信息再发还给客户端。这个客户端有一个问题,就是当一个客户端连接了之后第二个客户端要是还想连接服务端就需要等待。哪怕第一个客户端连上了之后不做任何操作只是挂着,只要它不断开连接,第二个客户端也别想连上。这就造成很大的浪费。这种连接模型就叫做同步阻塞。同步阻塞是IO模型中最简单的一种。

    142330286789443.png

    还有一种叫做同步非阻塞的IO模型。在学习socket的时候,有一个setblocking方法。如果配置socket服务端的时候将该方法置为False,看看修改后的代码:

    import socket
    ip_port = ('127.0.0.1',9999)
    sk = socket.socket()
    sk.bind(ip_port)
    sk.listen(5)
    """
    将setblocking()方法置为False,程序将不在阻塞
    """
    sk.setblocking(False)
    while True:
        #accept()方法不再阻塞
        conn,addr = sk.accept()
        print addr
        Flag=True
        while Flag:
            #recv()方法也不再阻塞了
            client_data = conn.recv(1024)
            conn.sendall(client_data)

    那么程序执行时候遇到accept()和recv()方法将不在被阻塞,直接执行后面的代码。因为accept()不在被阻塞,所以理论上是可以解决多个客户端的连接问题了。但是这个模式有个致命的问题,就是一旦accept()或者recv()收不到消息马上就会报错。如果不想报错,就需要一直不停的向服务端发消息,就算发送的不是服务端请求的数据也得发点别的什么东西。总之就是一句话;“不要停~!!”

    142332004602984.png

    第三种IO多路复用模型,就是本文要重点介绍的一种方式。select就是诞生最早也是最为典型的一种IO多路复用模型。前面我们提到,python中的IO操作有三种,file、socket和stdin。select可以通过监测这三种IO操作的文件句柄的变化,来感知客户端的是否接入。看一下代码:

    #!usr/bin/env python
    # coding:utf-8
    import socket
    import select
    ip_port=('127.0.0.1',8888)
    #创建一个socket实例,那这个实例的句柄就是sk
    sk=socket.socket()
    sk.bind(ip_port)
    sk.listen(5)
    sk.setblocking(False)
    #把句柄存入列表中
    inputs=[sk,]
    while True:
        """
        select检测的是inputs列表里的句柄的变动,如果句柄有变动(例如 新的客户端连进来或者
        已经连进来的客户端发了消息),就会把有变动的句柄赋值给rList,因此rList同一时间只会
        等于一个句柄(例如rList=sk或rList=conn)
        """
        #三个参数是检测列表中执行过程是否有错误,有错误的就把错误信息赋值给e
        """
        第4个参数表示阻塞时间,意思是阻塞多少秒之后就继续向下执行。默认不填的话select是        会阻塞住的,但是如果阻塞住就变成同步阻塞模式,那就没意义了。所以一般都是要写个阻塞     时间阻塞时间让程序继续向下执行的。
        """
        rList,w,e = select.select(inputs,[],[],0.05)
        import time
        time.sleep(2)
        print 'input:',inputs
        print 'result',rList
        for r in rList:
            #如果检测到sk句柄变动,表示是有新客户端请求接入
            if rList==sk:
                """
                conn就是客户端连接的句柄,当服务端与客户端第一次连接的时候产生变动的句柄
                是服务端的sk,连接创建之后如果有数据交互那么每次变动的句柄就是客户端的conn了
                """
                conn,address=r.accept()
                print address
            else:
                client_data=r.recv(1024)
                r.sendall(client_data)

    以前写socket服务端,如果我们希望服务端启动同时对端口8888和9999进行监听是无法做到的。唯一的办法只能是同样的服务端代码复制一遍之后再启动一个。但是select既然叫做IO多路复用模型,它就可以实现实现同时对多路端口访问的监听。因为select是通过句柄的变化来感知客户端接入的。那么我们就可以通过在代码中同时创建多个句柄,然后把这些句柄都丢入inputs列表交给select来进行监控。每个句柄对应不同的端口就可以了。看代码

    #!usr/bin/env python
    # coding:utf-8
    ip_port=('127.0.0.1',8888)
    sk=socket.socket()
    sk.bind(ip_port)
    sk.listen(5)
    sk.setblocking(False)
    ip_port1=('127.0.0.1',9999)
    sk1=socket.socket()
    sk1.bind(ip_port1)
    sk1.listen(5)
    sk1.setblocking(False)
    #把sk,sk1两个句柄存入列表中,select同时监控2个句柄
    inputs=[sk,sk1]
    while True:
        rList,w,e = select.select(inputs,[],[],0.05)
        import time
        time.sleep(2)
        print 'input:',inputs
        print 'result',rList
        for r in rList:
            if rList==sk:
                conn,address=r.accept()
                print address
            else:
                client_data=r.recv(1024)
                r.sendall(client_data)

    结合队列模块我们就可以写出完整的通过select多路复用的socket程序。这里引入队列模块的目的是为了防止消息回复错误。因为如果多个客户端同时接入,那就有可能造成本该回复给客户端1的消息被错误的回复给了客户端2。队列模块可以有效的将数据按照顺序存储和取出。避免消息存取顺序错误。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import socket
    import select
    import Queue
    ip_port = ('127.0.0.1',8888)
    sk = socket.socket()
    sk.bind(ip_port)
    sk.listen(5)
    sk.setblocking(False)
    inputs = [sk]
    """
    output函数用于select第二个参数,这个参数和第一个rList不同。第一个参数是inputs队里句柄有变化了才感知
    第二个参数是只要output队列里有内容就会感知。
    """
    output = []
    """
    message字典用于存放文件句柄和队列内容
    """
    message = {}
    #message = {
    #'c1':队列,
    #'c2':队列,[b,bb,bbb]
    #}
    while True:
        rList,wList,e = select.select(inputs, output, inputs, 1)
        # 文件描述符可读,rList,一,只有变化,感知
        # 文件描述符可写,wList,二,只有存在,感知
        for r in rList:
            #如果过rList的内容有变动就证明一个新的客户端请求连接进来了
            if r == sk:
                conn,address = r.accept()
                #conn就是获取的socket文件句柄
                inputs.append(conn)
                #将字典的value值设置为队列
                message[conn] = Queue.Queue()
            #如果rList句柄没变动,就说明客户端已经连接好。准备接收客户端发来的数据
            else:
                client_data = r.recv(1024)
                #如果客户端发来的内容不为空
                if client_data:
                    # 将获取的数据追加进output列表,此时select就会感知到第二个参数有值了
                    output.append(r)
                    #将文件句柄对应的客户端内容写入队列
                    message[r].put(client_data)
                else:
                    #如果发过来的数据为空,则删除input队列里对应的客户端文件句柄。表示断开连接
                    inputs.remove(r)
        #如果select第二个参数有值,那么output的句柄就会被赋值给wList
        for w in wList:
            # 去指定队列取数据
            try:
                #nowait()方法Queue队列获取内容的时候不在阻塞,但是如果队列里没有数据了就报错
                data = message[w].get_nowait()
                #将队列里抓取出来的数据发还给客户端
                w.sendall(data)
            except Queue.Empty:
                pass
            #发送完数据之后马上删除output列表里的值,不然会一直触发
            output.remove(w)
            #删除字典里对应的值。
            del message[w]

    IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。

    用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。

    从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

    142332187256396.png

    还有一种IO模型称为异步IO,上文我们介绍的IO多路复用又被称为异步阻塞。因为当客户端访问连接入select之后,客户端就的线程其实就被select阻塞了。意思就是服务端虽然没有阻塞了,但是客户端是有阻塞。这时候客户端什么都做不了。而异步IO的意思就是,当客户端连接进入服务端的时候,服务端首先是noblocking的,那么服务端不会被阻塞。同时,客户端连入之后马上就可以进行别别的工作,不需要想多路复用那样被阻塞住。当服务端准备好了数据之后会同时客户端,客户端接收到信息后再回来接收服务端的信息。这样就效率就更高了。但是异步IO需要系统内核的支持。所以很少被使用到。

    142333511475767.png


    IO多路复用与异步IO

    linux下的异步IO其实用得很少

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
    在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。


    同步与异步

    实际上同步与异步是针对应用程序与内核的交互而言的。同步过程中进程触发IO操作并等待或者轮询的去查看IO操作是否完成。异步过程中进程触发IO操作以后,直接返回,做自己的事情,IO交给内核来处理,完成后内核通知进程IO完成。同步与异步如下图所示:

    阻塞与非阻塞

      简单理解为需要做一件事能不能立即得到返回应答,如果不能立即获得返回,需要等待,那就阻塞了,否则就可以理解为非阻塞。详细区别如下图所示:


    如何选择同步还是异步呢? 
    主要有这么几个指标供参考 
    1. 并发数量 
    2. 接收字节数 
    3. 处理请求所需CPU时间 
    我们一个一个来考察 

    并发数 
    并发低的时候同步IO与异步IO差别不大 
    并发高时差别会比较明显,这要表现在 
    1. 开启线程数:如并发1000时,同步IO要开启1000个线程,1000个线程要占用很多内存,这是其一,其二1000个线程间切换的时间也是很可观的;异步IO则可避免这个问题 


    接收字节数 
    接收字节越少被阻塞的概率越低,同步IO与异步IO的差别就越小 
    接收字节越多被阻塞的概率就越大,异步IO的优势越明显,能够同时服务更多的客户端请求 

    处理请求所需CPU时间 
    与同步异步没什么关系 



    参考的文章

    http://www.cnblogs.com/Anker/p/3254269.html

    http://blog.csdn.net/historyasamirror/article/details/5778378

    http://blog.csdn.net/baixiaoshi/article/details/48708347







关键字