tornado 源码分析(1)

Preface

这是tornado源码分析的第一部分,主要讲ioloop的相关实现。

基本流程

直接看源码往往会让人无从入手,我们最简单的例子开始。

import tornado
import tornado.ioloop
import tornado.web


class SimpleHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Test")


if __name__ == "__main__":
    app = tornado.web.Application([
        ('/', SimpleHandler)
    ])
    app.listen(2333)
    tornado.ioloop.IOLoop.current().start()

从这个简单的例子里,我们干了这几件事:

  • 定义了一个Handler
  • Handler实例化了一个Application
  • Application监听端口。
  • 开始ioloop

其中最重要的就是IOLooptornado所有的膜法基本都集中在这个类中。

IOLoop

初见IOLoop这个类我是很懵逼的,Epoll呢?Select呢?除了current这个用来获取单例的静态方法外,start和其他一些Handler相关的方法都是NotImplemented,下面的instance函数也只是做了一下单例检查。

@staticmethod
def instance():
    """Returns a global `IOLoop` instance.

    Most applications have a single, global `IOLoop` running on the
    main thread.  Use this method to get this instance from
    another thread.  In most other cases, it is better to use `current()`
    to get the current thread's `IOLoop`.
    """
    if not hasattr(IOLoop, "_instance"):
        with IOLoop._instance_lock:
            if not hasattr(IOLoop, "_instance"):
                # New instance after double check
                IOLoop._instance = IOLoop()
    return IOLoop._instance

子类看不出门道,那就回到父类看。

Configurable这个类中,我们发现__new__方法被改写了,实例的创建逻辑被修改。

def __new__(cls, *args, **kwargs):
    base = cls.configurable_base()
    init_kwargs = {}
    if cls is base:
        impl = cls.configured_class()
        if base.__impl_kwargs:
            init_kwargs.update(base.__impl_kwargs)
    else:
        impl = cls
    init_kwargs.update(kwargs)
    instance = super(Configurable, cls).__new__(impl)
    # initialize vs __init__ chosen for compatibility with AsyncHTTPClient
    # singleton magic.  If we get rid of that we can switch to __init__
    # here too.
    instance.initialize(*args, **init_kwargs)
    return instance

IOLoop创建实例时创建出的实际是IOLoop.configured_class(),而在configured_class中,又取了IOLoopconfigurable_default

# IOLoop.configurable_default
@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
        return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
    return SelectIOLoop

可以看到,在configurable_default中,tornado按照平台差异,取了不同的异步实现方式。IOLoop实际创建的实例其实也是这几个之中的一个。

以Linux为例,看一下EPollIOLoop的实现,

class EPollIOLoop(PollIOLoop):
    def initialize(self, **kwargs):
        super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)

其中EPollIOLoop只修改了PollIOLoopinitialize的参数,实际逻辑在PollIOLoop中。

OK, 到这里,我们整理一下。

  • current创建/获取了一个IOLoop对象。
  • IOLoop重写了__new__方法,从而使实际构建的实例是平台相关的。
  • 实例本身不包含逻辑,只是改写了PollIOLoop的初始化参数。

绕了一大圈,总算找到了实际的实现逻辑。

PollIOLoop

大致过了一遍这个类,是否觉得豁然开朗?不管是add_handlerupdate_handler,remove_handler这几个事件相关的函数,还是start这个入口函数,都在这里面实现了。

从这几个事件注册相关的函数开始,我们看看tornado的魔法究竟是如何实现的。

add_handler

def add_handler(self, fd, handler, events):
    fd, obj = self.split_fd(fd)
    self._handlers[fd] = (obj, stack_context.wrap(handler))
    self._impl.register(fd, events | self.ERROR)

add_handler中,tornado将文件对象与文件描述符fd)分开,建立了fd到handler之间的映射,并且在_impl(此处是select.epoll())建注册了fd上的事件,update_handlerremove_handler做的也是如其名的功能,不再赘述。

start

The Golden Key

整个程序的主循环。

start中前半部分都在处理timeoutWaker相关的逻辑,在这里还没有直接遇到,先跳过。

直接看While True里面的东西。

try:
    event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
    # Depending on python version and IOLoop implementation,
    # different exception types may be thrown and there are
    # two ways EINTR might be signaled:
    # * e.errno == errno.EINTR
    # * e.args is like (errno.EINTR, 'Interrupted system call')
    if errno_from_exception(e) == errno.EINTR:
        continue
    else:
        raise

可以看到,主循环中通过poll方法得到最新的事件,通过事件对应的fd得到相应的handler,再用handler处理这个事件。

整个事件循环的流程大致如下。

add_handler -> poll event -> handle event

至此,整个torando的ioloop已经较为清晰的展现在我们面前,但是Application等相关handler如何被注入到这个ioloop中的,这些会在之后的笔记中写出。

作业

用epoll写一个简单的返回helloworld的非阻塞异步http服务器。

import select
import socket

response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(1)
server.setblocking(0)

event_imp = select.epoll()
event_imp.register(server.fileno(), select.EPOLLIN)

try:
    connections = {}
    requests = {}
    responses = {}

    while True:
        event_pair = event_imp.poll(1)
        for fileno, event in event_pair:
            if fileno == server.fileno():
                connection, address = server.accept()
                connection.setblocking(0)
                event_imp.register(connection.fileno(), select.EPOLLIN)
                connections[connection.fileno()] = connection
                requests[connection.fileno()] = b''
                responses[connection.fileno()] = response
                print(connections)
            elif event & select.EPOLLIN:
                requests[fileno] += connections[fileno].recv(1024)
                event_imp.modify(fileno, select.EPOLLOUT)
                print("Get request from %s : %s" % (connection, requests[fileno]))
            elif event & select.EPOLLOUT:
                sent = connections[fileno].send(responses[fileno])
                responses[fileno] = responses[fileno][sent:]
                if len(responses[fileno]) == 0:
                    event_imp.modify(fileno, 0)
                connections[fileno].shutdown(socket.SHUT_RDWR)
            elif event & select.EPOLLHUP:
                event_imp.unregister(fileno)
                connections[fileno].close()
                del connections[fileno]
finally:
    event_imp.unregister(server.fileno())
    event_imp.close()
    server.close()

可以看到,我们在一开始将监听TCP连接的socket在epoll上注册了EPOLLIN事件,并在新连接可用时为新连接注册了EPOLLIN事件,当新连接的内容接受完毕时,我们将连接注册的事件改为了EPOLLOUT,并在该fd可写时发送了HTTP报文。

这其实就是Tornado IOLoop最简化版的实现。