前记 Uvicorn是一个基于uvloop和httptools的ASGI服务器, 性能比较强劲, 通过它可以与使用ASGI规范的Python应用程序进行交互。ASGI与WSGI很像, 只不过ASGI原生支持HTTP2.0和WebSocket, 同时更多的是支持Python的Asyncio生态的WEB应用程序。通过了解Uvicron,能知道一个稳定的Web服务器的工作方式以及能更好的去了解其他基于ASGI的WEB应用程序
Uvicron通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn发送和接收信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def app (scope, receive, send ): assert scope['type' ] == 'http' await send({ 'type' : 'http.response.start' , 'status' : 200 , 'headers' : [ [b'content-type' , b'text/plain' ], ] }) await send({ 'type' : 'http.response.body' , 'body' : b'Hello, world!' , })if __name__ == "__main__" : import uvicorn uvicorn.run(app, host="127.0.0.1" , port=5000 , log_level="info" )
其中应用程序的scope代表有关传入连接信息的字典, receive是一个从服务器接收传入信息的通道, send是一个将消息发送到服务器的通道, 不过这不是本文的重点, 更多scope信息可以访问ASGI interface 了解, 接下来将从例子的uvicorn.run开始, 通过源码分析uvicorn工作原理。
1.uvicorn主流程源码分析 分析源码之前, 首先是了解它的源码结构, uvicorn的源码结构如下: ├── lifespan ├── loops ├── protocols ├── middleware ├── supervisors ├── config.py ├── importer.py ├── init .py ├── logging.py ├── main .py ├── main.py ├── server.py ├── subprocess.py ├── _types.py └── workers.py
uvicron做了很好的分类, 每个文件夹/文件都有自己的功能:
lifespan 告诉基于ASGI的应用程序uvicorn即将启动和停止的消息, uvicorn在启动的时候会初始化,然后发送初始化协议并等待ASGI应用程序返回, 如果ASGI应用程序返回complele则uvicorn会继续运行, 返回failed则报错退出。
loops 自动加载事件循环, 优先加载uvloop, 这将会获得极大的性能提升
protocols 里面存放着读取连接数据和解析消息体的协议, 如HTTP和WebSockets, 可以把他认为是一个序列化器。
middleware 存放着一些简单通用的ASGI中间件
supervisors uvicorn本身是以一个进程启动的, 这个文件夹存放着uvicorn的几种启动方式, 如多进程启动,监控文件变动自动重启的方式等。
config.py uvicorn的配置文件, 它不仅读取用户的配置, 还自动加载上面所述的lifespan, loops, protocols等等
importer.py uvicron中很多地方使用了动态加载配置和动态加载库, 这里是把这个方法进行统一封装
logging.py 提供了根据日志等级渲染不同颜色的日志以及访问日志(但是很少人用)
main.py uvicorn的入口文件, 包括代码运行和命令行运行两种方式
server.py uvicorn的核心服务, 用于处理进出流量以及处理自身的服务状态
subprocess.py 给supervisors/multiprocess.py使用的, 可能是为了以后拓展需要, 才放在一级目录
workers.py 其他工作模式的Uvicorn, 比如里面有个UvicornWorker, 就是用于gunicorn启动uvicorn
结构了解完, 接下来开始正式步入源码之旅, 这里直接忽略掉命令行的启动方式, 从uvicorn.run开始, 实际上命令行启动方式也是通过获取参数, 然后传入uvicorn方法中, 这个uvicorn.run方法会接受符合ASGI的app和kwargs参数, 然后生成对应的配置实例config, 再生成server, 接着依靠配置判断是否启动模式, 具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def run (app, **kwargs ): config = Config(app, **kwargs) server = Server(config=config) if (config.reload or config.workers > 1 ) and not isinstance (app, str ): logger = logging.getLogger("uvicorn.error" ) logger.warning( "You must pass the application as an import string to enable 'reload' or " "'workers'." ) sys.exit(1 ) if config.should_reload: sock = config.bind_socket() supervisor = ChangeReload(config, target=server.run, sockets=[sock]) supervisor.run() elif config.workers > 1 : sock = config.bind_socket() supervisor = Multiprocess(config, target=server.run, sockets=[sock]) supervisor.run() else : server.run()
config很简单, 它负责装填配置, 然后调用configure_logging配置全局的logger, 此外还有一个load的方法, 将会在Server中调用, 接下来, 先忽略其中涉及到的模板, 到server.run之中, 看看普通模式下, 服务是怎么启动的。
这个run方法很简单, 就是设置事件循环, 然后通过事件循环调用serve来启动服务:
1 2 3 4 def run (self, sockets=None ): self.config.setup_event_loop() loop = asyncio.get_event_loop() loop.run_until_complete(self.serve(sockets=sockets))
serve是启动服务的最核心代码, 首先会执行config.load方法加载一些动态的配置, 如解析http的库, 解析websocket的库, 还有通过用户传过来的app来加载app, 并判断是使用WSGI,ASGI2或者是ASGI3, 并进行配置(uvicorn在这里是通过ASGI中间件的方式来支持), 最后根据配置启动对应的中间件。 接着会跳转到server.startup方法, 该方法首先会通过lifaspan.startup与用户传过来的app通信, 校验是否是合法的应用程序, 然后初始化变量, 先是初始化一个信号处理函数, 当收到信号时, 会把变量should_exit设置会True。 然后会初始化一个名为create_protocol的变量, 它是继承于asyncio.Protocol, asyncio.Protocol主要用于从socket获取数据和写入数据, 同时也有一些TCP相关的调用, create_protocol的主要作用就是作为socket和应用程序的中间层, 负责把HTTP数据与ASGI数据互转, 如下图:协议转换 接着根据用户传过来的变量方式来启动服务, 这些都是Python的Asyncio封装好的, 具体为以下几种:
当用户传socket过来的时候: 基于该scoket和create_protocol创建服务, 如果是多进程且是Windows系统, 则要显示的共享socket。
当用户传文件描述符的时候: 基于该文件描述符获取scoket, 并通过该socket和create_protocol创建服务。
当用户传unix domain socket的时候: 基于unix domain socket和create_protocol创建服务。
当用户传host和port参数的时候: 基于host和port和create_protocol创建服务。
创建完服务后, socket的处理就转给了应用程序了, 但是采用了事件循环的思路, 需要uvicorn使用while使程序一直跑, 防止主程序退出:
1 2 3 4 5 6 7 8 async def main_loop (self ): counter = 0 should_exit = await self.on_tick(counter) while not should_exit: counter += 1 counter = counter % 864000 await asyncio.sleep(0.1 ) should_exit = await self.on_tick(counter)
每次循环执行的时候都会调用on_tick方法, 该方法主要是进行服务统计以及判断啥时候可以退出服务, 比如请求总数超过配置的限制数, 或者收到信号,把变量should_exit设置为True等等, 如果在循环中判断程序需要进行退出, 就会进入退出逻辑shutdown, 该逻辑比较简单, 注释和代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 async def shutdown (self, sockets=None ): logger.info("Shutting down" ) for server in self.servers: server.close() for sock in sockets or []: sock.close() for server in self.servers: await server.wait_closed() for connection in list (self.server_state.connections): connection.shutdown() await asyncio.sleep(0.1 ) if self.server_state.connections and not self.force_exit: msg = "Waiting for connections to close. (CTRL+C to force quit)" logger.info(msg) while self.server_state.connections and not self.force_exit: await asyncio.sleep(0.1 ) if self.server_state.tasks and not self.force_exit: msg = "Waiting for background tasks to complete. (CTRL+C to force quit)" logger.info(msg) while self.server_state.tasks and not self.force_exit: await asyncio.sleep(0.1 ) if not self.force_exit: await self.lifespan.shutdown()
至此整个主流程分析完毕, 下图是我整理后的一个流程图: 从图中可以很清晰的看清uvicorn与ASGI应用程序的关系, 接下来是上面部分没有详细讲过的小组件源码分析。
2.uvicorn.protocols源码分析 在了解了uvicorn的主流程后只大概的知道uvicorn是通过uvicorn.protocols与应用程序进行通信, 但是不明白他们具体是如何通信的, 接下来就开始了解uvicorn中最核心的uvicorn.protocols。
从上面的分析中我们可以知道, 作为一个Web服务器, uvicorn是通过一个socket来接收发发送请求的。 而对于socket来说, 它只关心怎么创建连接,关闭连接以及如何传输内容, 它不会关心传输的字节流的上层协议是如何实现的。 好在Asyncio提供了几种简单的网络传输模型, 它们都是对于这些传输数据的抽象, 这些抽象会在如loop.create_server和loop.create_unix_server方法中使用, 通过这个抽象我们能很方便的使用TCP和UDP连接。
uvicorn在基于Asyncio创建服务器时, 把protocol抽象通过protocols参数传入loop.create_server和loop.create_unix_server后, 维护他们返回的server对象, 剩下的与ASGI应用程序的数据交互全由protocol对象处理。uvicorn封装的对象继承于asyncio.Protocols, 它是针对TCP协议的封装, 它总共有6个方法,包括启动时的connection_made, 接收数据时的data_received, 断开时的eof_received以及丢失连接时的connection_lost, 然后还有当TCP连接出现堵塞时的暂停pause_writing和恢复resume_writing, 具体的方法传输的参数和使用方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class Protocol (BaseProtocol ): def connection_made (self, transport ): """ 在建立连接时调用. 参数是表示管道连接的transport, 此时得到的transport需要设置为该类的transport, 方便后续connection_lost控制关闭管道. """ def data_received (self, data ): """ 通过该方法可以获取到客户端传输过来的数据 """ def eof_received (self ): """ 当另一端调用write_eof()或等效函数时调用. 如果返回一个假值(包括None),则传输将关闭自身。 如果它返回true值,则关闭传输取决于协议. """ def connection_lost (self, exc ): """ 当连接丢失或关闭时调用, 根据exc判断是否要关闭trnasport. 参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接). """ def pause_writing (self ): """ 当transport缓冲区超过高水位(high-water mark)时调用, 此时应该能控制外部不再写入数据(通常是一个asyncio.Future), 同时应该通过transport.pause_reading来停止获取数据, 之后TCP就会通过拥塞机制使得客户端减缓发送数据的速度。 """ def resume_writing (self ): """ 当transport缓冲区排放低于低水位线(low-water mark)时调用. 此时要释放标志, 使得外部可以继续写入数据, 同时通过transport.resume_reading来恢复获取数据, 之后TCP就会通过拥塞机制知道服务端的处理能力上来了, 使客户端加快发送速度。 """
了解完asycnio.Protocol后, 可以正式了解uvicorn对asyncio.Protocol做了哪些修改来达到跟应用程序进行通信, 由于各个Protocol的封装差不多, 这里以httptools_impl.HttpToolsProtocol为例子进行说明。
首先是类的初始化, HttpToolsProtocol 在初始化时会加载config并配置到日志,对应的websocket协议处理,对应的http解析器以及serve创建的统计容器等, 其中需要注意的是, 在初始化时, 传入的变量是HttpToolsProtocol的实例化本身。 接着是Protocol的几大主要协议接口函数, 这里以源码和注释进行分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class HttpToolsProtocol (asyncio.Protocol ): def connection_made (self, transport ): self.connections.add(self) self.transport = transport self.flow = FlowControl(transport) self.server = get_local_addr(transport) self.client = get_remote_addr(transport) self.scheme = "https" if is_ssl(transport) else "http" if self.logger.level <= TRACE_LOG_LEVEL: prefix = "%s:%d - " % tuple (self.client) if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sConnection made" , prefix) def connection_lost (self, exc ): self.connections.discard(self) if self.logger.level <= TRACE_LOG_LEVEL: prefix = "%s:%d - " % tuple (self.client) if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sConnection lost" , prefix) if self.cycle and not self.cycle.response_complete: self.cycle.disconnected = True if self.cycle is not None : self.cycle.message_event.set () if self.flow is not None : self.flow.resume_writing() def _unset_keepalive_if_required (self ): """取消keep alive timeout的任务, 一般来说, 在发送数据后服务端会等待客户端发送数据, 如果超过多少秒没有发送数据则可以判断该客户端已经断开了, 服务端可以主动关闭连接 而uvicorn通过timeout_keep_alive_task来实现 """ if self.timeout_keep_alive_task is not None : self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None def data_received (self, data ): self._unset_keepalive_if_required() try : self.parser.feed_data(data) except httptools.HttpParserError as exc: msg = "Invalid HTTP request received." self.logger.warning(msg, exc_info=exc) self.transport.close() except httptools.HttpParserUpgrade: self.handle_upgrade()
分析完了几个跟连接相关的主要方法后就会发现分析路线已经断了, 而该类中还有很多on_xxx的方法, 它们也没有被其他方法调用。 这是因为在初始化HTTP协议解析器的时候,uvicorn.protocol把自己的实例传入了HTTP解析器中, 解析器会边接收数据边按照url, header, body来顺序解析, 并在执行每种数据解析后, 会通过回调告诉传入的实例, uvicorn正是通过on_xxx方法来监听这些回调并处理解析完的HTTP数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 class HttpToolsProtocol (asyncio.Protocol ): def on_url (self, url ): """这是收到一个请求后的第一次解析, 可以认为是该请求体的初始化, 此时会根据url和连接数据进行初始化, 并存放在实例的scope中""" method = self.parser.get_method() parsed_url = httptools.parse_url(url) raw_path = parsed_url.path path = raw_path.decode("ascii" ) if "%" in path: path = urllib.parse.unquote(path) self.url = url self.expect_100_continue = False self.headers = [] self.scope = { "type" : "http" , "asgi" : {"version" : self.config.asgi_version, "spec_version" : "2.1" }, "http_version" : "1.1" , "server" : self.server, "client" : self.client, "scheme" : self.scheme, "method" : method.decode("ascii" ), "root_path" : self.root_path, "path" : path, "raw_path" : raw_path, "query_string" : parsed_url.query if parsed_url.query else b"" , "headers" : self.headers, } def on_header (self, name: bytes , value: bytes ): """解析器在解析header时, 是按照header一行一行进行解析的, 所以每即系一行header都会调用一次on_header, 并把他们存在实例的headers中""" name = name.lower() if name == b"expect" and value.lower() == b"100-continue" : self.expect_100_continue = True self.headers.append((name, value)) def on_headers_complete (self ): """对于大部分前置web框架来说, 一般解析到header后就结束不再解析了, 会开始发送到正真处理的应用程序, uvicorn也是这样的""" http_version = self.parser.get_http_version() if http_version != "1.1" : self.scope["http_version" ] = http_version if self.parser.should_upgrade(): return if self.limit_concurrency is not None and ( len (self.connections) >= self.limit_concurrency or len (self.tasks) >= self.limit_concurrency ): app = service_unavailable message = "Exceeded concurrency limit." self.logger.warning(message) else : app = self.app existing_cycle = self.cycle self.cycle = RequestResponseCycle( scope=self.scope, transport=self.transport, flow=self.flow, logger=self.logger, access_logger=self.access_logger, access_log=self.access_log, default_headers=self.default_headers, message_event=asyncio.Event(), expect_100_continue=self.expect_100_continue, keep_alive=http_version != "1.0" , on_response=self.on_response_complete, ) if existing_cycle is None or existing_cycle.response_complete: task = self.loop.create_task(self.cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) else : self.flow.pause_reading() self.pipeline.insert(0 , (self.cycle, app)) def on_body (self, body: bytes ): """读取到原生的body字节, 如果ASGI处理者还在运行, 且不是websocket, 则转给ASGI处理者 注: 一个请求可能会触发多次on_body""" if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.body += body if len (self.cycle.body) > HIGH_WATER_LIMIT: self.flow.pause_reading() self.cycle.message_event.set () def on_message_complete (self ): if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.more_body = False self.cycle.message_event.set () def on_response_complete (self ): """返回响应时的回调""" self.server_state.total_requests += 1 if self.transport.is_closing(): return self._unset_keepalive_if_required() self.timeout_keep_alive_task = self.loop.call_later( self.timeout_keep_alive, self.timeout_keep_alive_handler ) self.flow.resume_reading() if self.pipeline: cycle, app = self.pipeline.pop() task = self.loop.create_task(cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task)
在了解解析HTTP数据的时候, 经常会遇到一个cycle对象, 这个对象是基于ASGI负责读写数据转换的对象, 这个对象有send和receive两个方法, 这两个方法的命名是站在ASGI应用程序的角度来命名的。 其中send通过传入的参数message获取到ASGI应用程序返回的数据, 并依据ASGI协议进行解析, 并拼接成HTTP协议的字节流, 当ASGI应用程序发送结束标记时, send会把拼接的字节流通过socket返回给客户端, 同时触发on_response_complete方法。 而receive比较简单, 它只负责接收获取到已经解析完成的HTTP数据(早前面on_xxx时会把数据传给cycle), 然后发送到ASGI应用程序中。 这两个方法都是通过on_headers_complete中执行的run_asgi方法来调用的, 通过该方法, uvicorn会把数据的处理权转给ASGI应用程序, 如果ASGI应用程序处理异常, 则会返回HTTP状态码为500的响应给客户端并关闭transport。
分析完了cycle对象后, 再次回到protocol的data_received的方法中, 这里通过获取httptools.HttpParserUpgrade异常的方式得知当前可能是一个WebSocket请求, 于是进入handle_upgrade逻辑, 这个逻辑会检查是否加载了解析WebSocket解析器以及请求体是否满足WebSocket条件, 如果不满足就会返回一个响应体告诉客户端当前无法支持该HTTP请求的升级协议, 如果满足则会生成WebSocketProtocols来处理socket的数据, 并把它设置为当前的transport向关联, 不过这个WebsocketProtocols基本上是HTTP protocols和cycle两个对象的融合, 具体处理步骤也差不多, 这里就不多做描述了。
至此, 大体上的uvicorn.protocols源码就分析完了, 由于uvicorn是把asyncio.Protocol, 解析器, cycle三者结合在一起, 所以分析起来要经常跳转, 因此我把他们的流程转成如下的图:流处理图
3.总结 至此, uvicorn的核心流程已经分析完了, 它先是通过server来启动一个服务, 并管理服务状态, 然后再通过protocol负责做双端的序列化, 使ASGI应用程序能够按照ASGI协议读写数据, 其中protocol还融合了HTTP解析器解析HTTP并通过它来解析数据。 当然, 除了上述主流程外, uvicorn还包括了中间件, 多进程启动以及监控文件变化重启服务等组件, 这些组件的代码量不大, 分析源码也说不出啥, 这里就简单略过。