I/O多路复用

从接触netty以来,对网络编程中的诸多I/O模型存在困惑,直到最近学习python,才渐渐清晰起来。本文主要梳理一下关于传统多线程模型、多路复用技术以及select、epoll模式多路复用的知识点。

多线程模型

网络编程的基本模型是Client-Server模型,也就网络中两个进程之间相互通信,服务端提供位置信息,客户端向服务端发起连接请求,三次握手成功建立连接之后,双方就可以通过Socket进行通信。

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from socket import *
from threading import Thread


def clientHandler(clientSocket, clientAddress):
"""处理客户端连接"""

print('与客户端:%s:%s 建立连接...' % clientAddress)

while True:
receiveMessage = clientSocket.recv(1024)
if receiveMessage:
# 将接受到的信息直接返回
print(receiveMessage.decode('utf-8'))
else:
# 如果客户端关闭,关闭服务端连接
clientSocket.close()
print('与客户端:%s:%s 已断开连接...'% clientAddress)
break


def main():
# 创建服务端
serverSocket = socket(AF_INET, SOCK_STREAM)

# 设置服务端参数
serverSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
serverSocket.bind(('', 8088))

# 开始监听
serverSocket.listen(10)

try:
while True:
# 等待客户端接入,此处会阻塞当前线程,直到有新的客户端接入
clientSocket, clientAddress = serverSocket.accept()

# 成功连接后,会创建新的线程用来处理输入出
serverThread = Thread(target=clientHandler, args=(clientSocket, clientAddress))
serverThread.start()

finally:
serverSocket.close()


if __name__ == '__main__':
main()

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from socket import *


def main():
# 创建scoket
clientSocket = socket(AF_INET, SOCK_STREAM)
serverAddr = ('127.0.0.1', 8088)

# 连接服务端
clientSocket.connect(serverAddr)

sendMessage = input('输入要发送的信息:')
clientSocket.send(bytes(sendMessage, 'utf-8'))

clientSocket.close()
print('客户端已关闭')


if __name__ == '__main__':
main()

说明:

  • 服务端主线程负责监听客户端的连接
  • 接收到客户端请求后,便为每一个连接创建一个新的线程
  • 处理完成后,关闭连接,销毁线程

这就是典型的一请求一应答模式,当客户端并发访问量增加时,服务端线程数与客户端数将呈1:1的关系增加,这将及其耗费系统的性能。试想有2000个客户端连接时,服务端将创建2000个线程用于处理这些连接,首先要维持这些线程就要耗费大量内存,在做上下文切换的时候可能直接导致系统内存耗尽或者当前进程宕机。那么能不能只使用少量的线程,就可以处理这些连接呢?

单线程非阻塞

python中可以将socket设置为非阻塞,看下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from socket import *

serverSocket = socket(AF_INET, SOCK_STREAM)
serverSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

serverSocket.bind(('', 8088))
serverSocket.listen(10)

# 将服务端负责监听的socket设置成非阻塞
serverSocket.setblocking(False)

# 保存新建立的连接
g_clientList = []

while True:
try:
# 将socket设置成非阻塞后,如果没有接收到数据,会抛出异常,需要做下异常处理
clientSocket, clientAddress = serverSocket.accept()
except:
pass
else:
# 如果没有报错 表示成功了建立了一个新的连接
print('与客户端:%s:%s 建立连接...' % clientAddress)

# 将新建立的socket连接设置成非阻塞,并保存到列表中
clientSocket.setblocking(False)
g_clientList.append((clientSocket, clientAddress))


# 此循环主要用作轮询列表中的socket,处理可以接收数据的socket
for clientSocket, clientAddress in g_clientList:
try:
# 如果clientsocket中没有可接收的数据,此处会抛出异常
receiveMessage = clientSocket.recv(1024)
except:
pass
else:
if receiveMessage:
print(receiveMessage.decode('utf-8'))
else:
clientSocket.close()

# 关闭连接后,将该连接从列表中移除,不再做轮询
g_clientList.remove((clientSocket, clientAddress))
print('与客户端:%s:%s 断开连接...' % clientAddress)

上述代码实现了单线程的情况下可以处理多个连接,关键点有以下几点:

  • socket不再阻塞当前线程
  • 维护了一个list,用来存放可用的socket连接
  • 轮询存放socket的集合,处理可读写的socket连接。

这就是一个简单版的I/O多路复用实现。

多路复用(Multiplexing),维基百科上给出的解释是表示在一个信道上传输多路信号或数据流的技术。下图是维基百科给出的模型图:

mark

mark

结合上面的单线程非阻塞例子,可以理解成多个socket连接复用一个线程。

select模式

前面的例子中,使用单线程处理多个socket连接,可以很大程度地节约创建线程和线程切换时带来的系统性能消耗。但同样存在一个问题,由于每次都要轮询所有的socket连接,这将大量耗费CPU时间,而且不是所有的socket都处于就绪状态(连接、读、写),试想轮询了2000个连接结果只有一个scoket在收发包,这无疑浪费了很多CPU性能,那么有没有方法可以得到这些可用的就绪连接,只对这些活跃的连接进行轮询呢?

这里首先要引入一个概念,叫做文件描述符(file descriptor,fd),linux中内核将所有外部设备看做一个文件来操作,对一个文件的读写操作会调用内核提供的系统命令,返回一个fd,同样对socket的操作也会有相应 的描述符,描述符是一个数字。

所有socket的文件描述符被放入到一个数组中,前辈们发明了一个系统调用select,select会依次遍历这个数组,如果对应的文件描述符处于就绪状态,就会返回该描述符。如果遍历结束后,仍没有一个可用的fd,它会让当前用户进程睡眠,等到有可用资源的时候再唤醒。

下面是select模式多路复用的简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from socket import *
from select import *

# 创建服务端socket,并开启监听
serverSocket = socket(AF_INET, SOCK_STREAM)
serverSocket.bind(('', 8088))
serverSocket.listen(10)

# 存放socket连接
inputSockets = [serverSocket]

while True:
# select会阻塞等待...
# select方法接收三个类型为列表的参数:
# param1: 检查该list中是否有socket可以接收数据
# param2: 检查该list中是否有socket可以发送数据
# param3: 检查该list中是否有socket发生异常
readableList, writeableList, exceptionalList = select(inputSockets, [], [])

# select方法会返回一个元组,包括:可读的连接列表、可写列表、异常列表

# 数据抵达 遍历可写列表
for soc in readableList:

# 有新的连接,握手成功后,放入列表中
if soc == serverSocket:
client, address = serverSocket.accept()
print('与客户端:%s:%s 建立连接...' % address)
inputSockets.append(client)

# 有新的数据到达
else:
receiveMessage = soc.recv(1024)
if receiveMessage:
# 如果有数据,则打印该数据
print(receiveMessage.decode('utf-8'))

else:
# 如果无数据,从列表中移除该连接,关闭连接
inputSockets.remove(soc)
soc.close()
print('与客户端:%s:%s 断开连接...' % address)

通过select调用可以让用户程序直接获取可用的socket连接,相比较于用程序直接轮询所有socket连接,socket模式在系统内核层面实现,效率极高,而且select基本上所有平台。

但是…

  • select单个线程可监视的fd数量存在限制,一般是1024
  • ‘If a file descriptor being monitored by select() is closed in another thread, the result is unspecified’,如果别的线程关闭了正在被select监听的fd,结果将是不可预测的…
  • select采用轮询的方法,效率极低

那,怎么办!

epoll模式

先来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import socket
import select

# 创建server
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('', 8088))
server.listen(10)

# windows不支持
epoll = select.epoll()

# 将创建的套接字添加到epoll的事件监听中
epoll.register(server.fileno(), select.EPOLLIN | select.EPOLLET)

clients = {}
addresses = {}

while True:

# 用来检测套接字可读写的状态,处于可用状态的socket会通知epoll
epoll_list = epoll.poll()

# 对事件进行判断
for fd, event in epoll_list:

# 如果负责监听的socket被激活
if fd == server.fileno():
client, address = server.accept()
print('与客户端:%s:%s 建立连接...' % address)

# 将socket信息和address信息保存在字典中
clients[client.fileno()] = client
addresses[client.fileno()] = address

# 向epoll注册新接入的连接
epoll.register(client.fileno(), select.EPOLLIN | select.EPOLLET)

# 可接收数据的事件,处理对应连接
elif event == select.EPOLLIN:
message = clients[fd].recv(1024)

if message:
print(message.decode('utf-8'))
else:
# 从epoll中解除注册
epoll.unregister(fd)
clients[fd].close()
print('与客户端:%s:%s 断开连接...' % addresses[fd])

不同于select,epoll采用事件回调的方式。socket一开始会向epoll注册事件,如果socket变为可用状态,则会触发事件回调,被epoll获取。同时epoll也解决了select单个线程所能监视的fd数量有限的问题。

无论服务端有多少个连接,epoll关心的只是那些活跃的连接,所以epoll的效率较之select也要高出很多。(打个比方,比如说考勤,是每天把大家集中在一起点一下名,看谁没来,还是直接打卡签到快?)

总结

无论是select,还是epoll,I/O多路复用的关键维护了一张fd表。它把多个socket连接的阻塞,转移到单线程如何从众多连接中筛选出可用状态的fd上。至于如何使用这张fd表,是单线程还是交给其他的线程去处理,由具体的实际需要决定了。