前记 Uvicorn
是一个基于uvloop
和httptools
的ASGI服务器, 性能比较强劲, 通过它可以与使用ASGI规范的Python
应用程序进行交互。ASGI与WSGI很像, 只不过ASGI原生支持HTTP2.0和WebSocket, 同时更多的是支持Python
的Asyncio
生态的WEB应用程序。通过了解Uvicron
,能知道一个稳定的Web服务器的工作方式以及能更好的去了解其他基于ASGI的WEB应用程序
Uvicron
通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn
发送和接收信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def app (scope, receive, send ): assert scope['type' ] == 'http' await send({ 'type' : 'http.response.start' , 'status' : 200 , 'headers' : [ [b'content-type' , b'text/plain' ], ] }) await send({ 'type' : 'http.response.body' , 'body' : b'Hello, world!' , })if __name__ == "__main__" : import uvicorn uvicorn.run(app, host="127.0.0.1" , port=5000 , log_level="info" )
其中应用程序的scope
代表有关传入连接信息的字典, receive
是一个从服务器接收传入信息的通道, send
是一个将消息发送到服务器的通道, 不过这不是本文的重点, 更多scope
信息可以访问ASGI interface 了解, 接下来将从例子的uvicorn.run
开始, 通过源码分析uvicorn
工作原理。
1.uvicorn主流程源码分析 分析源码之前, 首先是了解它的源码结构, uvicorn
的源码结构如下: ├── lifespan ├── loops ├── protocols ├── middleware ├── supervisors ├── config.py ├── importer.py ├── init .py ├── logging.py ├── main .py ├── main.py ├── server.py ├── subprocess.py ├── _types.py └── workers.py
uvicron
做了很好的分类, 每个文件夹/文件都有自己的功能:
lifespan 告诉基于ASGI的应用程序uvicorn
即将启动和停止的消息, uvicorn
在启动的时候会初始化,然后发送初始化协议并等待ASGI应用程序返回, 如果ASGI应用程序返回complele
则uvicorn
会继续运行, 返回failed
则报错退出。
loops 自动加载事件循环, 优先加载uvloop
, 这将会获得极大的性能提升
protocols 里面存放着读取连接数据和解析消息体的协议, 如HTTP和WebSockets, 可以把他认为是一个序列化器。
middleware 存放着一些简单通用的ASGI中间件
supervisors uvicorn
本身是以一个进程启动的, 这个文件夹存放着uvicorn
的几种启动方式, 如多进程启动,监控文件变动自动重启的方式等。
config.py uvicorn
的配置文件, 它不仅读取用户的配置, 还自动加载上面所述的lifespan
, loops
, protocols
等等
importer.py uvicron
中很多地方使用了动态加载配置和动态加载库, 这里是把这个方法进行统一封装
logging.py 提供了根据日志等级渲染不同颜色的日志以及访问日志(但是很少人用)
main.py uvicorn
的入口文件, 包括代码运行和命令行运行两种方式
server.py uvicorn
的核心服务, 用于处理进出流量以及处理自身的服务状态
subprocess.py 给supervisors/multiprocess.py
使用的, 可能是为了以后拓展需要, 才放在一级目录
workers.py 其他工作模式的Uvicorn
, 比如里面有个UvicornWorker
, 就是用于gunicorn
启动uvicorn
结构了解完, 接下来开始正式步入源码之旅, 这里直接忽略掉命令行的启动方式, 从uvicorn.run
开始, 实际上命令行启动方式也是通过获取参数, 然后传入uvicorn
方法中, 这个uvicorn.run
方法会接受符合ASGI的app和kwargs
参数, 然后生成对应的配置实例config
, 再生成server
, 接着依靠配置判断是否启动模式, 具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def run (app, **kwargs ): config = Config(app, **kwargs) server = Server(config=config) if (config.reload or config.workers > 1 ) and not isinstance (app, str ): logger = logging.getLogger("uvicorn.error" ) logger.warning( "You must pass the application as an import string to enable 'reload' or " "'workers'." ) sys.exit(1 ) if config.should_reload: sock = config.bind_socket() supervisor = ChangeReload(config, target=server.run, sockets=[sock]) supervisor.run() elif config.workers > 1 : sock = config.bind_socket() supervisor = Multiprocess(config, target=server.run, sockets=[sock]) supervisor.run() else : server.run()
config
很简单, 它负责装填配置, 然后调用configure_logging
配置全局的logger
, 此外还有一个load
的方法, 将会在Server
中调用, 接下来, 先忽略其中涉及到的模板, 到server.run
之中, 看看普通模式下, 服务是怎么启动的。
这个run
方法很简单, 就是设置事件循环, 然后通过事件循环调用serve
来启动服务:
1 2 3 4 def run (self, sockets=None ): self.config.setup_event_loop() loop = asyncio.get_event_loop() loop.run_until_complete(self.serve(sockets=sockets))
serve
是启动服务的最核心代码, 首先会执行config.load
方法加载一些动态的配置, 如解析http的库, 解析websocket的库, 还有通过用户传过来的app来加载app, 并判断是使用WSGI
,ASGI2
或者是ASGI3
, 并进行配置(uvicorn
在这里是通过ASGI中间件的方式来支持), 最后根据配置启动对应的中间件。 接着会跳转到server.startup
方法, 该方法首先会通过lifaspan.startup
与用户传过来的app
通信, 校验是否是合法的应用程序, 然后初始化变量, 先是初始化一个信号处理函数, 当收到信号时, 会把变量should_exit
设置会True
。 然后会初始化一个名为create_protocol
的变量, 它是继承于asyncio.Protocol
, asyncio.Protocol
主要用于从socket获取数据和写入数据, 同时也有一些TCP相关的调用, create_protocol
的主要作用就是作为socket和应用程序的中间层, 负责把HTTP数据与ASGI数据互转, 如下图:协议转换 接着根据用户传过来的变量方式来启动服务, 这些都是Python
的Asyncio
封装好的, 具体为以下几种:
当用户传socket过来的时候: 基于该scoket和create_protocol
创建服务, 如果是多进程且是Windows系统, 则要显示的共享socket。
当用户传文件描述符的时候: 基于该文件描述符获取scoket, 并通过该socket和create_protocol
创建服务。
当用户传unix domain socket的时候: 基于unix domain socket和create_protocol
创建服务。
当用户传host和port参数的时候: 基于host和port和create_protocol
创建服务。
创建完服务后, socket的处理就转给了应用程序了, 但是采用了事件循环的思路, 需要uvicorn
使用while
使程序一直跑, 防止主程序退出:
1 2 3 4 5 6 7 8 async def main_loop (self ): counter = 0 should_exit = await self.on_tick(counter) while not should_exit: counter += 1 counter = counter % 864000 await asyncio.sleep(0.1 ) should_exit = await self.on_tick(counter)
每次循环执行的时候都会调用on_tick
方法, 该方法主要是进行服务统计以及判断啥时候可以退出服务, 比如请求总数超过配置的限制数, 或者收到信号,把变量should_exit
设置为True
等等, 如果在循环中判断程序需要进行退出, 就会进入退出逻辑shutdown
, 该逻辑比较简单, 注释和代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 async def shutdown (self, sockets=None ): logger.info("Shutting down" ) for server in self.servers: server.close() for sock in sockets or []: sock.close() for server in self.servers: await server.wait_closed() for connection in list (self.server_state.connections): connection.shutdown() await asyncio.sleep(0.1 ) if self.server_state.connections and not self.force_exit: msg = "Waiting for connections to close. (CTRL+C to force quit)" logger.info(msg) while self.server_state.connections and not self.force_exit: await asyncio.sleep(0.1 ) if self.server_state.tasks and not self.force_exit: msg = "Waiting for background tasks to complete. (CTRL+C to force quit)" logger.info(msg) while self.server_state.tasks and not self.force_exit: await asyncio.sleep(0.1 ) if not self.force_exit: await self.lifespan.shutdown()
至此整个主流程分析完毕, 下图是我整理后的一个流程图: 从图中可以很清晰的看清uvicorn
与ASGI应用程序的关系, 接下来是上面部分没有详细讲过的小组件源码分析。
2.uvicorn.protocols源码分析 在了解了uvicorn
的主流程后只大概的知道uvicorn
是通过uvicorn.protocols
与应用程序进行通信, 但是不明白他们具体是如何通信的, 接下来就开始了解uvicorn
中最核心的uvicorn.protocols
。
从上面的分析中我们可以知道, 作为一个Web服务器, uvicorn
是通过一个socket来接收发发送请求的。 而对于socket来说, 它只关心怎么创建连接,关闭连接以及如何传输内容, 它不会关心传输的字节流的上层协议是如何实现的。 好在Asyncio
提供了几种简单的网络传输模型, 它们都是对于这些传输数据的抽象, 这些抽象会在如loop.create_server
和loop.create_unix_server
方法中使用, 通过这个抽象我们能很方便的使用TCP和UDP连接。
uvicorn
在基于Asyncio
创建服务器时, 把protocol
抽象通过protocols参数传入loop.create_server
和loop.create_unix_server
后, 维护他们返回的server
对象, 剩下的与ASGI应用程序的数据交互全由protocol
对象处理。uvicorn
封装的对象继承于asyncio.Protocols
, 它是针对TCP协议的封装, 它总共有6个方法,包括启动时的connection_made
, 接收数据时的data_received
, 断开时的eof_received
以及丢失连接时的connection_lost
, 然后还有当TCP连接出现堵塞时的暂停pause_writing
和恢复resume_writing
, 具体的方法传输的参数和使用方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class Protocol (BaseProtocol ): def connection_made (self, transport ): """ 在建立连接时调用. 参数是表示管道连接的transport, 此时得到的transport需要设置为该类的transport, 方便后续connection_lost控制关闭管道. """ def data_received (self, data ): """ 通过该方法可以获取到客户端传输过来的数据 """ def eof_received (self ): """ 当另一端调用write_eof()或等效函数时调用. 如果返回一个假值(包括None),则传输将关闭自身。 如果它返回true值,则关闭传输取决于协议. """ def connection_lost (self, exc ): """ 当连接丢失或关闭时调用, 根据exc判断是否要关闭trnasport. 参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接). """ def pause_writing (self ): """ 当transport缓冲区超过高水位(high-water mark)时调用, 此时应该能控制外部不再写入数据(通常是一个asyncio.Future), 同时应该通过transport.pause_reading来停止获取数据, 之后TCP就会通过拥塞机制使得客户端减缓发送数据的速度。 """ def resume_writing (self ): """ 当transport缓冲区排放低于低水位线(low-water mark)时调用. 此时要释放标志, 使得外部可以继续写入数据, 同时通过transport.resume_reading来恢复获取数据, 之后TCP就会通过拥塞机制知道服务端的处理能力上来了, 使客户端加快发送速度。 """
了解完asycnio.Protocol
后, 可以正式了解uvicorn
对asyncio.Protocol
做了哪些修改来达到跟应用程序进行通信, 由于各个Protocol
的封装差不多, 这里以httptools_impl.HttpToolsProtocol
为例子进行说明。
首先是类的初始化, HttpToolsProtocol
在初始化时会加载config
并配置到日志,对应的websocket协议处理,对应的http解析器以及serve
创建的统计容器等, 其中需要注意的是, 在初始化时, 传入的变量是HttpToolsProtocol
的实例化本身。 接着是Protocol
的几大主要协议接口函数, 这里以源码和注释进行分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class HttpToolsProtocol (asyncio.Protocol ): def connection_made (self, transport ): self.connections.add(self) self.transport = transport self.flow = FlowControl(transport) self.server = get_local_addr(transport) self.client = get_remote_addr(transport) self.scheme = "https" if is_ssl(transport) else "http" if self.logger.level <= TRACE_LOG_LEVEL: prefix = "%s:%d - " % tuple (self.client) if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sConnection made" , prefix) def connection_lost (self, exc ): self.connections.discard(self) if self.logger.level <= TRACE_LOG_LEVEL: prefix = "%s:%d - " % tuple (self.client) if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sConnection lost" , prefix) if self.cycle and not self.cycle.response_complete: self.cycle.disconnected = True if self.cycle is not None : self.cycle.message_event.set () if self.flow is not None : self.flow.resume_writing() def _unset_keepalive_if_required (self ): """取消keep alive timeout的任务, 一般来说, 在发送数据后服务端会等待客户端发送数据, 如果超过多少秒没有发送数据则可以判断该客户端已经断开了, 服务端可以主动关闭连接 而uvicorn通过timeout_keep_alive_task来实现 """ if self.timeout_keep_alive_task is not None : self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None def data_received (self, data ): self._unset_keepalive_if_required() try : self.parser.feed_data(data) except httptools.HttpParserError as exc: msg = "Invalid HTTP request received." self.logger.warning(msg, exc_info=exc) self.transport.close() except httptools.HttpParserUpgrade: self.handle_upgrade()
分析完了几个跟连接相关的主要方法后就会发现分析路线已经断了, 而该类中还有很多on_xxx
的方法, 它们也没有被其他方法调用。 这是因为在初始化HTTP协议解析器的时候,uvicorn.protocol
把自己的实例传入了HTTP解析器中, 解析器会边接收数据边按照url, header, body来顺序解析, 并在执行每种数据解析后, 会通过回调告诉传入的实例, uvicorn
正是通过on_xxx
方法来监听这些回调并处理解析完的HTTP数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 class HttpToolsProtocol (asyncio.Protocol ): def on_url (self, url ): """这是收到一个请求后的第一次解析, 可以认为是该请求体的初始化, 此时会根据url和连接数据进行初始化, 并存放在实例的scope中""" method = self.parser.get_method() parsed_url = httptools.parse_url(url) raw_path = parsed_url.path path = raw_path.decode("ascii" ) if "%" in path: path = urllib.parse.unquote(path) self.url = url self.expect_100_continue = False self.headers = [] self.scope = { "type" : "http" , "asgi" : {"version" : self.config.asgi_version, "spec_version" : "2.1" }, "http_version" : "1.1" , "server" : self.server, "client" : self.client, "scheme" : self.scheme, "method" : method.decode("ascii" ), "root_path" : self.root_path, "path" : path, "raw_path" : raw_path, "query_string" : parsed_url.query if parsed_url.query else b"" , "headers" : self.headers, } def on_header (self, name: bytes , value: bytes ): """解析器在解析header时, 是按照header一行一行进行解析的, 所以每即系一行header都会调用一次on_header, 并把他们存在实例的headers中""" name = name.lower() if name == b"expect" and value.lower() == b"100-continue" : self.expect_100_continue = True self.headers.append((name, value)) def on_headers_complete (self ): """对于大部分前置web框架来说, 一般解析到header后就结束不再解析了, 会开始发送到正真处理的应用程序, uvicorn也是这样的""" http_version = self.parser.get_http_version() if http_version != "1.1" : self.scope["http_version" ] = http_version if self.parser.should_upgrade(): return if self.limit_concurrency is not None and ( len (self.connections) >= self.limit_concurrency or len (self.tasks) >= self.limit_concurrency ): app = service_unavailable message = "Exceeded concurrency limit." self.logger.warning(message) else : app = self.app existing_cycle = self.cycle self.cycle = RequestResponseCycle( scope=self.scope, transport=self.transport, flow=self.flow, logger=self.logger, access_logger=self.access_logger, access_log=self.access_log, default_headers=self.default_headers, message_event=asyncio.Event(), expect_100_continue=self.expect_100_continue, keep_alive=http_version != "1.0" , on_response=self.on_response_complete, ) if existing_cycle is None or existing_cycle.response_complete: task = self.loop.create_task(self.cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) else : self.flow.pause_reading() self.pipeline.insert(0 , (self.cycle, app)) def on_body (self, body: bytes ): """读取到原生的body字节, 如果ASGI处理者还在运行, 且不是websocket, 则转给ASGI处理者 注: 一个请求可能会触发多次on_body""" if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.body += body if len (self.cycle.body) > HIGH_WATER_LIMIT: self.flow.pause_reading() self.cycle.message_event.set () def on_message_complete (self ): if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.more_body = False self.cycle.message_event.set () def on_response_complete (self ): """返回响应时的回调""" self.server_state.total_requests += 1 if self.transport.is_closing(): return self._unset_keepalive_if_required() self.timeout_keep_alive_task = self.loop.call_later( self.timeout_keep_alive, self.timeout_keep_alive_handler ) self.flow.resume_reading() if self.pipeline: cycle, app = self.pipeline.pop() task = self.loop.create_task(cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task)
在了解解析HTTP数据的时候, 经常会遇到一个cycle对象, 这个对象是基于ASGI负责读写数据转换的对象, 这个对象有send
和receive
两个方法, 这两个方法的命名是站在ASGI应用程序的角度来命名的。 其中send
通过传入的参数message
获取到ASGI应用程序返回的数据, 并依据ASGI协议进行解析, 并拼接成HTTP协议的字节流, 当ASGI应用程序发送结束标记时, send
会把拼接的字节流通过socket返回给客户端, 同时触发on_response_complete
方法。 而receive
比较简单, 它只负责接收获取到已经解析完成的HTTP数据(早前面on_xxx时会把数据传给cycle), 然后发送到ASGI应用程序中。 这两个方法都是通过on_headers_complete
中执行的run_asgi
方法来调用的, 通过该方法, uvicorn
会把数据的处理权转给ASGI应用程序, 如果ASGI应用程序处理异常, 则会返回HTTP状态码为500的响应给客户端并关闭transport。
分析完了cycle对象后, 再次回到protocol的data_received
的方法中, 这里通过获取httptools.HttpParserUpgrade
异常的方式得知当前可能是一个WebSocket请求, 于是进入handle_upgrade
逻辑, 这个逻辑会检查是否加载了解析WebSocket解析器以及请求体是否满足WebSocket条件, 如果不满足就会返回一个响应体告诉客户端当前无法支持该HTTP请求的升级协议, 如果满足则会生成WebSocketProtocols来处理socket的数据, 并把它设置为当前的transport向关联, 不过这个WebsocketProtocols基本上是HTTP protocols和cycle两个对象的融合, 具体处理步骤也差不多, 这里就不多做描述了。
至此, 大体上的uvicorn.protocols
源码就分析完了, 由于uvicorn
是把asyncio.Protocol
, 解析器, cycle
三者结合在一起, 所以分析起来要经常跳转, 因此我把他们的流程转成如下的图:流处理图
3.总结 至此, uvicorn
的核心流程已经分析完了, 它先是通过server
来启动一个服务, 并管理服务状态, 然后再通过protocol
负责做双端的序列化, 使ASGI应用程序能够按照ASGI协议读写数据, 其中protocol
还融合了HTTP解析器解析HTTP并通过它来解析数据。 当然, 除了上述主流程外, uvicorn
还包括了中间件, 多进程启动以及监控文件变化重启服务等组件, 这些组件的代码量不大, 分析源码也说不出啥, 这里就简单略过。