2016-2-23 更新
文中的 AsyncFileWrapper 原意是提供一個在異步事件循環中
操作本地文件的高效方法,但是經各位提醒我才知道,大部分
操作系統其實並不支持本地文件的異步操作,目前在異步事件
循環中訪問本地文件的推薦方法是將訪問操作放到另一個
線程/進程去做(見 run_in_executor )。當然,
AsyncFileWrapper 還是可以用在外設(像串口)和管道上的。
另: Python 3 目前推薦使用 async/await 語法(詳情見我的
另一篇文章),本文中的代碼,除了最後的 some_func 函
數以外,在新語法下也是適用的。
緣起
最近想换换口味,在用asyncio
写一个小东西,过程中碰到各种概念上、实践上的问题,悄悄
记在这里XD.
所谓的“异步”
回归到最初的定义的话,“异步”是指不同硬件之间可以工作在
不同的时钟信号下——试想一下要求所有硬件工作在相同时钟信
号下的系统该有多脆弱。所以同步总线通常出现在与系统本身
工作时钟接近的硬件接口上,而异步总线正好相反,用来连接
远远达不到系统工作频率的硬件。
这些概念投射到同根同源的软件上的话,由于软件命令最终都
是由硬件去执行的,所以软件的操作也有快有慢,“异步操作”
的重点就是协调“快”的操作和“慢”的操作。然后众所周知,最
慢的操作是IO.
与PC硬件里琳琅满目的总线速度不同,软件里一般只有两种速
度:指令执行速度和IO速度——像Erlang计算reduction的时候
也只考虑指令执行时间和IO时间,所有的指令和IO类型都是一
视同仁的。这是为神马呢?大概是因为人脑跟不上电脑吧……
“正确”的异步操作
网上到处都是这样的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | import socket
import multiprocessing
def handler(conn, addr):
message = conn.recv(1024)
while message:
conn.send(message)
conn.close()
def server(host, port):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind((host, port))
listener.listen(10)
while True:
conn, addr = listener.accept()
process = multiprocessing.Process(target=handler, args=(conn, addr))
process.start()
|
这是异步操作没错,用另一个进程来处理请求,只是粒度略大,
因为这只是将快的进程(handler进程)和慢的进程(server进
程)分开了而已,无论在handler还是server里都要等待IO. 而
且我在#python上贴出
这个程序的时候马上有一堆人出来说服务器不应该这样写之类
的XD.
于是武林中就有了下面这种模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 | import socket
import selectors
def handler(conn):
message = conn.recv(1024)
conn.send(message)
def server(host, port):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind((host, port))
listener.listen(10)
selector = selectors.DefaultSelector()
selector.register(listener, selectors.EVENT_READ)
while True:
event_list = selector.select()
for key, events in event_list:
conn = key.fileobj
if conn == listener:
new_conn, addr = conn.accept()
selector.register(new_conn, selectors.EVENT_READ)
else:
handler(conn)
|
我将这个模式称为“在忙完指令之后等待IO”,坊间说的“异步IO”
一般也是特指这种两段式模式:一段等待IO,一段执行指令。别
看上面的程序很简单,很大一部分高性能的服务器或者框架(像
Tornado、Twisted和Erlang)都是基于这个模式的,因为它的资
源消耗比基于多进程/多线程的服务器实在是少太多了(参考Nginx
和Apache的对比),所以扩展性(scalability)也好太多了。
asyncio模块
asyncio是在Python 3.4中添加的新模块,实现了上面的“忙完指
令之后等待IO”模式。
这世上已经有好多异步框架和库了,Guido老爷子为什么要推行这
样一个新模块?他在PEP
里说的原因是,这些第三方异步代码相互之间不兼容不能移植
blahblah……我倒觉得是Twisted的camelCaseNaming不合老爷子胃
口而已……
还是上面的echo server例子,用外星科技实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | import asyncio
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, exc):
self.transport.close()
def data_received(self, data):
self.transport.write(data)
def server(host, port):
loop = asyncio.get_event_loop()
srv = loop.create_server(EchoProtocol, host, port)
asyncio.async(srv)
loop.run_forever()
|
熟悉Twisted的同学应该会有deja vu的感觉,我们提供一个接收
事件的对象(实际上是产生对象的factory),然后事件就源源不
断地自动出现了。
但是这看起来和之前selector的例子不像啊?提示:真正的循环
在BaseEventLoop的_run_once方法里:)
实践中使用asyncio的要点
好了上面的例子很美好,但其实只展示了asyncio一小部分的威力,
下面来一些私人干货。
在事件处理方法中创建TCP连接
由于实际上asyncio是在事件循环中调用asyncio.Protocol类
(或者子类)的data_received等方法的,这些事件处理方法
如果阻塞的话,会将整个事件循环也阻塞住,失去了所有“异
步IO”模式带来的好处,所以所有的事件处理方法——包括
data_received、connection_made、connection_lost等——都
不能调用任何可能阻塞的函数,包括socket对象的recv方法、
文件对象的read方法等,当然socket的connect方法由于域名
解析和网络延迟等也是会阻塞的……那我们要怎么从事件处理
方法里做connect操作呢?
答案是asyncio提供了一系列异步操作的、不会阻塞的接口,
当然也包括“创建TCP连接”。这些接口全部以coroutine的形
式提供,调用时要使用yield from
语法。例如可以这样搭
建一个简单的代理服务器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | HOST = 'www.google.com'
PORT = '80'
@asyncio.coroutine
def new_connection(host, port):
loop = asyncio.get_event_loop()
client_transport, client_proto = yield from \
loop.create_connection(ClientProtocol, host, port)
return client_transport, client_proto
class ClientProtocol(asyncio.Protocol):
....
class ServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.client_task = asyncio.Task(new_connection(HOST, PORT))
self.client_task.add_done_callback(self.client_connect_done)
....
def client_connect_done(self, future):
client_transport, client_proto = future.result()
....
def server(host, port):
loop = asyncio.get_event_loop()
srv = loop.create_server(ProxyProtocol, host, port)
asyncio.async(srv)
loop.run_forever()
|
ServerProtocol在收到一个新连接(connection_made)的时候用
asyncio.Task调度一个创建新连接的异步函数,这个函数会由asyncio
的事件循环在connection_made返回后择机执行,执行完成后事
件循环再去调用通过add_done_callback注册的处理函数
(client_connect_done),满满的javascript既视感啊有木有。
为什么yield from不能直接写在connection_made里,而需要另外
封装一个函数呢?因为asyncio的事件循环认定了Protocol对象的
事件处理方法是普通函数,如果yield from直接出现在
connection_made中的话,事件循环调用connection_made的时候
只会返回一个generator,connection_made的函数体完全不会被
执行,所以在事件处理方法中只能通过调度Task(或者使用
asyncio.async(...),效果一样)的方式执行异步操作。
替换事件循环使用的selector
Python 3.4中还有一个和asyncio配套的新模块:selectors.
这个模块将select、epoll、kqueue等等系统级异步IO接口抽象
成“selector”类型,规定了统一的对外接口,于是程序只管使
用selector的接口就行了,不用管底层的实现到底是select还
是epoll.
asyncio中用的就是selector模块,
asyncio.selector_events.BaseSelectorEventLoop类的构造
函数有一个selector参数,通常使用默认值就可以了,但是
当然我们也可以把它给换成我们自己的类。比方说如果我们希
望事件循环能支持ZeroMQ的socket,
可以把selector的底层实现换成zmq.Poller():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 | class ZmqSelector(selectors._BaseSelectorImpl):
def __init__(self, poller=None):
super().__init__()
if poller is not None:
self._zmq_poller = poller
else:
self._zmq_poller = zmq.Poller()
def _fileobj_lookup(self, fileobj):
if isinstance(fileobj, zmq.Socket):
return fileobj
else:
return super()._fileobj_lookup(fileobj)
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
flags = 0
if events & selectors.EVENT_READ:
flags |= zmq.POLLIN
if events & selectors.EVENT_WRITE:
flags |= zmq.POLLOUT
self._zmq_poller.register(fileobj, flags)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
self._zmq_poller.unregister(fileobj)
return key
def select(self, timeout=None):
if timeout is not None:
poll_timeout = max(0, math.ceil(timeout * 1e3))
else:
poll_timeout = None
select_ready = []
try:
zmq_events = self._zmq_poller.poll(poll_timeout)
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
return select_ready
else:
raise e
for sock, ev in zmq_events:
key = self._key_from_fd(sock)
if key is not None:
events = 0
if ev & zmq.POLLIN:
events |= selectors.EVENT_READ
if ev & zmq.POLLOUT:
events |= selectors.EVENT_WRITE
if ev & zmq.POLLERR:
events = selectors.EVENT_READ | selectors.EVENT_WRITE
select_ready.append((key, events & key.events))
return select_ready
def install_zmq_event_loop():
event_loop = asyncio.SelectorEventLoop(ZmqSelector())
asyncio.set_event_loop(event_loop)
|
将大文件的读写拆成小块
在事件循环里做任何耗时的操作都是不对的,尤其是IO,
即便是可以随时读写的本地文件,内存里装不下的话还是
会启动硬盘马达让你等个半天。最简单的方法是将大文件
的读写拆分成小块,例如每次只读一页的内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | class AsyncFileWrapper(object):
DEFAULT_BLOCK_SIZE = 4096
def __init__(self, loop=None, filename=None,
fileobj=None, mode='rb'):
if (filename is None and fileobj is None) or \
(filename is not None and fileobj is not None):
raise RuntimeError('Confilicting arguments')
if filename is not None:
if 'b' not in mode:
raise RuntimeError('Only binary mode is supported')
fileobj = open(filename, mode=mode)
elif 'b' not in fileobj.mode:
raise RuntimeError('Only binary mode is supported')
self.fileobj = fileobj
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
self.rbuffer = bytearray()
def read_ready(self, future, n, total):
res = self.fileobj.read1(n)
if not res: # EOF
future.set_result(bytes(self.rbuffer))
return
self.rbuffer.extend(res)
if total > 0:
more_to_go = total - len(self.rbuffer)
if more_to_go <= 0: # enough
res, self.rbuffer = self.rbuffer[:n], self.rbuffer[n:]
future.set_result(bytes(res))
else:
self.loop.call_soon(self.read_ready, future, more_to_go, total)
else: # < 0
self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, total)
@asyncio.coroutine
def read(self, n=-1):
future = asyncio.Future(loop=self.loop)
if n == 0:
future.set_result(b'')
return future
elif n < 0:
self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, n)
else:
self.loop.call_soon(self.read_ready, future, n, n)
return future
def write(self, data):
# XXX: big data?
return self.fileobj.write(data)
def close(self):
self.fileobj
|
上面这个类通过不断使用asyncio事件循环的call_soon方法,
重复执行读取小块文件内容的代码(read_ready方法),使得
循环内的其他代码有更多执行机会,典型的以吞吐量换响应速
度。可以在coroutine中这样使用此类:
| @asyncio.coroutine
def some_func(...):
...
afile = AsyncFileWrapper(filename='some_file.txt')
content = yield from afile.read()
...
|
Errata
AsyncFileWrapper
多谢Robber Phex评论指正,AsyncFileWrapper wrap起来的文
件对象其实还是工作在同步状态下的,需要指定O_NONBLOCK.
另外这个类实际工作时会出现调用close方法之后read_ready方
法又被事件循环回调的情况,所以close方法中还要做额外的清
理工作。
以上修改都在这个gist里.