Preface
这是tornado源码分析的第一部分,主要讲ioloop的相关实现。
基本流程
直接看源码往往会让人无从入手,我们最简单的例子开始。
import tornado
import tornado.ioloop
import tornado.web
class SimpleHandler(tornado.web.RequestHandler):
def get(self):
self.write("Test")
if __name__ == "__main__":
app = tornado.web.Application([
('/', SimpleHandler)
])
app.listen(2333)
tornado.ioloop.IOLoop.current().start()
从这个简单的例子里,我们干了这几件事:
- 定义了一个
Handler
- 用
Handler
实例化了一个Application
Application
监听端口。- 开始
ioloop
其中最重要的就是IOLoop
,tornado
所有的膜法基本都集中在这个类中。
IOLoop
初见IOLoop这个类我是很懵逼的,Epoll
呢?Select
呢?除了current
这个用来获取单例的静态方法外,start
和其他一些Handler相关的方法都是NotImplemented
,下面的instance
函数也只是做了一下单例检查。
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single, global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. In most other cases, it is better to use `current()`
to get the current thread's `IOLoop`.
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
子类看不出门道,那就回到父类看。
在Configurable
这个类中,我们发现__new__
方法被改写了,实例的创建逻辑被修改。
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatibility with AsyncHTTPClient
# singleton magic. If we get rid of that we can switch to __init__
# here too.
instance.initialize(*args, **init_kwargs)
return instance
在IOLoop
创建实例时创建出的实际是IOLoop.configured_class()
,而在configured_class
中,又取了IOLoop
的configurable_default
。
# IOLoop.configurable_default
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
可以看到,在configurable_default
中,tornado按照平台差异,取了不同的异步实现方式。IOLoop
实际创建的实例其实也是这几个之中的一个。
以Linux为例,看一下EPollIOLoop
的实现,
class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
其中EPollIOLoop
只修改了PollIOLoop
中initialize
的参数,实际逻辑在PollIOLoop
中。
OK, 到这里,我们整理一下。
current
创建/获取了一个IOLoop
对象。IOLoop
重写了__new__
方法,从而使实际构建的实例是平台相关的。- 实例本身不包含逻辑,只是改写了
PollIOLoop
的初始化参数。
绕了一大圈,总算找到了实际的实现逻辑。
PollIOLoop
大致过了一遍这个类,是否觉得豁然开朗?不管是add_handler
、update_handler
,remove_handler
这几个事件相关的函数,还是start
这个入口函数,都在这里面实现了。
从这几个事件注册相关的函数开始,我们看看tornado
的魔法究竟是如何实现的。
add_handler
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
add_handler
中,tornado
将文件对象与文件描述符(fd
)分开,建立了fd
到handler之间的映射,并且在_impl
(此处是select.epoll()
)建注册了fd
上的事件,update_handler
和remove_handler
做的也是如其名的功能,不再赘述。
start
The Golden Key
整个程序的主循环。
start
中前半部分都在处理timeout
和Waker
相关的逻辑,在这里还没有直接遇到,先跳过。
直接看While True
里面的东西。
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
可以看到,主循环中通过poll
方法得到最新的事件,通过事件对应的fd得到相应的handler,再用handler处理这个事件。
整个事件循环的流程大致如下。
add_handler
-> poll event
-> handle event
至此,整个torando的ioloop
已经较为清晰的展现在我们面前,但是Application
等相关handler如何被注入到这个ioloop
中的,这些会在之后的笔记中写出。
作业
用epoll写一个简单的返回helloworld的非阻塞异步http服务器。
import select
import socket
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(1)
server.setblocking(0)
event_imp = select.epoll()
event_imp.register(server.fileno(), select.EPOLLIN)
try:
connections = {}
requests = {}
responses = {}
while True:
event_pair = event_imp.poll(1)
for fileno, event in event_pair:
if fileno == server.fileno():
connection, address = server.accept()
connection.setblocking(0)
event_imp.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
print(connections)
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
event_imp.modify(fileno, select.EPOLLOUT)
print("Get request from %s : %s" % (connection, requests[fileno]))
elif event & select.EPOLLOUT:
sent = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][sent:]
if len(responses[fileno]) == 0:
event_imp.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
event_imp.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
event_imp.unregister(server.fileno())
event_imp.close()
server.close()
可以看到,我们在一开始将监听TCP连接的socket在epoll
上注册了EPOLLIN
事件,并在新连接可用时为新连接注册了EPOLLIN
事件,当新连接的内容接受完毕时,我们将连接注册的事件改为了EPOLLOUT
,并在该fd可写时发送了HTTP报文。
这其实就是Tornado IOLoop最简化版的实现。