uvicorn源码分析

本文总阅读量

前记

Uvicorn是一个基于uvloophttptools的ASGI服务器, 性能比较强劲, 通过它可以与使用ASGI规范的Python应用程序进行交互。ASGI与WSGI很像, 只不过ASGI原生支持HTTP2.0和WebSocket, 同时更多的是支持PythonAsyncio生态的WEB应用程序。通过了解Uvicron,能知道一个稳定的Web服务器的工作方式以及能更好的去了解其他基于ASGI的WEB应用程序

Uvicron通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn发送和接收信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def app(scope, receive, send):
# 一个最简单的ASGI应用程序
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})


if __name__ == "__main__":
# uvicorn服务
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")

其中应用程序的scope代表有关传入连接信息的字典, receive是一个从服务器接收传入信息的通道, send是一个将消息发送到服务器的通道, 不过这不是本文的重点, 更多scope信息可以访问ASGI interface了解, 接下来将从例子的uvicorn.run开始, 通过源码分析uvicorn工作原理。

1.uvicorn主流程源码分析

分析源码之前, 首先是了解它的源码结构, uvicorn的源码结构如下:
├── lifespan
├── loops
├── protocols
├── middleware
├── supervisors
├── config.py
├── importer.py
├── init.py
├── logging.py
├── main.py
├── main.py
├── server.py
├── subprocess.py
├── _types.py
└── workers.py

uvicron做了很好的分类, 每个文件夹/文件都有自己的功能:

  • lifespan
    告诉基于ASGI的应用程序uvicorn即将启动和停止的消息, uvicorn在启动的时候会初始化,然后发送初始化协议并等待ASGI应用程序返回, 如果ASGI应用程序返回compleleuvicorn会继续运行, 返回failed则报错退出。
  • loops
    自动加载事件循环, 优先加载uvloop, 这将会获得极大的性能提升
  • protocols
    里面存放着读取连接数据和解析消息体的协议, 如HTTP和WebSockets, 可以把他认为是一个序列化器。
  • middleware
    存放着一些简单通用的ASGI中间件
  • supervisors
    uvicorn本身是以一个进程启动的, 这个文件夹存放着uvicorn的几种启动方式, 如多进程启动,监控文件变动自动重启的方式等。
  • config.py
    uvicorn的配置文件, 它不仅读取用户的配置, 还自动加载上面所述的lifespan, loops, protocols等等
  • importer.py
    uvicron中很多地方使用了动态加载配置和动态加载库, 这里是把这个方法进行统一封装
  • logging.py
    提供了根据日志等级渲染不同颜色的日志以及访问日志(但是很少人用)
  • main.py
    uvicorn的入口文件, 包括代码运行和命令行运行两种方式
  • server.py
    uvicorn的核心服务, 用于处理进出流量以及处理自身的服务状态
  • subprocess.py
    supervisors/multiprocess.py使用的, 可能是为了以后拓展需要, 才放在一级目录
  • workers.py
    其他工作模式的Uvicorn, 比如里面有个UvicornWorker, 就是用于gunicorn启动uvicorn

结构了解完, 接下来开始正式步入源码之旅, 这里直接忽略掉命令行的启动方式, 从uvicorn.run开始, 实际上命令行启动方式也是通过获取参数, 然后传入uvicorn方法中, 这个uvicorn.run方法会接受符合ASGI的app和kwargs参数, 然后生成对应的配置实例config, 再生成server, 接着依靠配置判断是否启动模式, 具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def run(app, **kwargs):
# 加载配置
config = Config(app, **kwargs)
# 加载server
server = Server(config=config)

if (config.reload or config.workers > 1) and not isinstance(app, str):
# 只有命令行模式才可以使用reload
logger = logging.getLogger("uvicorn.error")
logger.warning(
"You must pass the application as an import string to enable 'reload' or "
"'workers'."
)
sys.exit(1)

if config.should_reload:
# 启动reload逻辑
sock = config.bind_socket()
supervisor = ChangeReload(config, target=server.run, sockets=[sock])
supervisor.run()
elif config.workers > 1:
# 多进程方式启动
sock = config.bind_socket()
supervisor = Multiprocess(config, target=server.run, sockets=[sock])
supervisor.run()
else:
# 最普通的方法启动
server.run()

config很简单, 它负责装填配置, 然后调用configure_logging配置全局的logger, 此外还有一个load的方法, 将会在Server中调用, 接下来, 先忽略其中涉及到的模板, 到server.run之中, 看看普通模式下, 服务是怎么启动的。

这个run方法很简单, 就是设置事件循环, 然后通过事件循环调用serve来启动服务:

1
2
3
4
def run(self, sockets=None):
self.config.setup_event_loop()
loop = asyncio.get_event_loop()
loop.run_until_complete(self.serve(sockets=sockets))

serve是启动服务的最核心代码, 首先会执行config.load方法加载一些动态的配置, 如解析http的库, 解析websocket的库, 还有通过用户传过来的app来加载app, 并判断是使用WSGIASGI2或者是ASGI3, 并进行配置(uvicorn在这里是通过ASGI中间件的方式来支持), 最后根据配置启动对应的中间件。
接着会跳转到server.startup方法, 该方法首先会通过lifaspan.startup与用户传过来的app通信, 校验是否是合法的应用程序, 然后初始化变量, 先是初始化一个信号处理函数, 当收到信号时, 会把变量should_exit设置会True
然后会初始化一个名为create_protocol的变量, 它是继承于asyncio.Protocol, asyncio.Protocol主要用于从socket获取数据和写入数据, 同时也有一些TCP相关的调用, create_protocol的主要作用就是作为socket和应用程序的中间层, 负责把HTTP数据与ASGI数据互转, 如下图:
协议转换
接着根据用户传过来的变量方式来启动服务, 这些都是PythonAsyncio封装好的, 具体为以下几种:

  • 当用户传socket过来的时候: 基于该scoket和create_protocol创建服务, 如果是多进程且是Windows系统, 则要显示的共享socket。
  • 当用户传文件描述符的时候: 基于该文件描述符获取scoket, 并通过该socket和create_protocol创建服务。
  • 当用户传unix domain socket的时候: 基于unix domain socket和create_protocol创建服务。
  • 当用户传host和port参数的时候: 基于host和port和create_protocol创建服务。

创建完服务后, socket的处理就转给了应用程序了, 但是采用了事件循环的思路, 需要uvicorn使用while使程序一直跑, 防止主程序退出:

1
2
3
4
5
6
7
8
async def main_loop(self):
counter = 0
should_exit = await self.on_tick(counter)
while not should_exit:
counter += 1
counter = counter % 864000
await asyncio.sleep(0.1)
should_exit = await self.on_tick(counter)

每次循环执行的时候都会调用on_tick方法, 该方法主要是进行服务统计以及判断啥时候可以退出服务, 比如请求总数超过配置的限制数, 或者收到信号,把变量should_exit设置为True等等, 如果在循环中判断程序需要进行退出, 就会进入退出逻辑shutdown, 该逻辑比较简单, 注释和代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def shutdown(self, sockets=None):
logger.info("Shutting down")

# 关闭socket, 不让有新的连接建立
for server in self.servers:
server.close()
for sock in sockets or []:
sock.close()
for server in self.servers:
await server.wait_closed()

# 关闭已经创建的连接, 并等待他们处理完毕
for connection in list(self.server_state.connections):
connection.shutdown()
await asyncio.sleep(0.1)

# 等待连接关闭或者用户强制关闭
if self.server_state.connections and not self.force_exit:
msg = "Waiting for connections to close. (CTRL+C to force quit)"
logger.info(msg)
while self.server_state.connections and not self.force_exit:
await asyncio.sleep(0.1)

# 等待后台任务完成或者用户强制关闭
if self.server_state.tasks and not self.force_exit:
msg = "Waiting for background tasks to complete. (CTRL+C to force quit)"
logger.info(msg)
while self.server_state.tasks and not self.force_exit:
await asyncio.sleep(0.1)

# 通过lifespan告诉ASGI应用程序即将关闭
if not self.force_exit:
await self.lifespan.shutdown()

至此整个主流程分析完毕, 下图是我整理后的一个流程图:

从图中可以很清晰的看清uvicorn与ASGI应用程序的关系, 接下来是上面部分没有详细讲过的小组件源码分析。

2.uvicorn.protocols源码分析

在了解了uvicorn的主流程后只大概的知道uvicorn是通过uvicorn.protocols与应用程序进行通信, 但是不明白他们具体是如何通信的, 接下来就开始了解uvicorn中最核心的uvicorn.protocols

从上面的分析中我们可以知道, 作为一个Web服务器, uvicorn是通过一个socket来接收发发送请求的。 而对于socket来说, 它只关心怎么创建连接,关闭连接以及如何传输内容, 它不会关心传输的字节流的上层协议是如何实现的。
好在Asyncio提供了几种简单的网络传输模型, 它们都是对于这些传输数据的抽象, 这些抽象会在如loop.create_serverloop.create_unix_server方法中使用, 通过这个抽象我们能很方便的使用TCP和UDP连接。

uvicorn在基于Asyncio创建服务器时, 把protocol抽象通过protocols参数传入loop.create_serverloop.create_unix_server后, 维护他们返回的server对象, 剩下的与ASGI应用程序的数据交互全由protocol对象处理。
uvicorn封装的对象继承于asyncio.Protocols, 它是针对TCP协议的封装, 它总共有6个方法,包括启动时的connection_made, 接收数据时的data_received, 断开时的eof_received以及丢失连接时的connection_lost, 然后还有当TCP连接出现堵塞时的暂停pause_writing和恢复resume_writing, 具体的方法传输的参数和使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Protocol(BaseProtocol):
def connection_made(self, transport):
"""
在建立连接时调用.

参数是表示管道连接的transport,
此时得到的transport需要设置为该类的transport, 方便后续connection_lost控制关闭管道.
"""

def data_received(self, data):
"""
通过该方法可以获取到客户端传输过来的数据
"""

def eof_received(self):
"""
当另一端调用write_eof()或等效函数时调用.

如果返回一个假值(包括None),则传输将关闭自身。
如果它返回true值,则关闭传输取决于协议.
"""

def connection_lost(self, exc):
"""
当连接丢失或关闭时调用, 根据exc判断是否要关闭trnasport.
参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接).
"""

def pause_writing(self):
"""
当transport缓冲区超过高水位(high-water mark)时调用,
此时应该能控制外部不再写入数据(通常是一个asyncio.Future),
同时应该通过transport.pause_reading来停止获取数据, 之后TCP就会通过拥塞机制使得客户端减缓发送数据的速度。
"""

def resume_writing(self):
"""
当transport缓冲区排放低于低水位线(low-water mark)时调用.
此时要释放标志, 使得外部可以继续写入数据,
同时通过transport.resume_reading来恢复获取数据,
之后TCP就会通过拥塞机制知道服务端的处理能力上来了, 使客户端加快发送速度。
"""

了解完asycnio.Protocol后, 可以正式了解uvicornasyncio.Protocol做了哪些修改来达到跟应用程序进行通信, 由于各个Protocol的封装差不多, 这里以httptools_impl.HttpToolsProtocol为例子进行说明。

首先是类的初始化, HttpToolsProtocol 在初始化时会加载config并配置到日志,对应的websocket协议处理,对应的http解析器以及serve创建的统计容器等, 其中需要注意的是, 在初始化时, 传入的变量是HttpToolsProtocol的实例化本身。
接着是Protocol的几大主要协议接口函数, 这里以源码和注释进行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class HttpToolsProtocol(asyncio.Protocol):
def connection_made(self, transport):
# 添加实例本身到集合, 代表当前还有连接在处理
self.connections.add(self)

self.transport = transport
# 初始化流控制
self.flow = FlowControl(transport)
# 简单的初始化实例trsnaport的相关编列
self.server = get_local_addr(transport)
self.client = get_remote_addr(transport)
self.scheme = "https" if is_ssl(transport) else "http"

if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % tuple(self.client) if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sConnection made", prefix)

def connection_lost(self, exc):
# 从集合删除实例本身, 代表当前连接已经处理玩了, 不需要进入统计容器
self.connections.discard(self)

if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % tuple(self.client) if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sConnection lost", prefix)

# 设置cycle, 告诉他连接已经断开
if self.cycle and not self.cycle.response_complete:
self.cycle.disconnected = True
if self.cycle is not None:
self.cycle.message_event.set()
if self.flow is not None:
self.flow.resume_writing()

def _unset_keepalive_if_required(self):
"""取消keep alive timeout的任务,
一般来说, 在发送数据后服务端会等待客户端发送数据, 如果超过多少秒没有发送数据则可以判断该客户端已经断开了, 服务端可以主动关闭连接
而uvicorn通过timeout_keep_alive_task来实现
"""
if self.timeout_keep_alive_task is not None:
self.timeout_keep_alive_task.cancel()
self.timeout_keep_alive_task = None

def data_received(self, data):
self._unset_keepalive_if_required()

try:
# 接受字节数据, 并交由http解析器进行解析
self.parser.feed_data(data)
except httptools.HttpParserError as exc:
# 解析失败, 应该不是http协议的数据, 断开连接
msg = "Invalid HTTP request received."
self.logger.warning(msg, exc_info=exc)
self.transport.close()
except httptools.HttpParserUpgrade:
# 已经超过了解析器能解析的协议版本, 应该交由更新的协议解析器处理
self.handle_upgrade()

分析完了几个跟连接相关的主要方法后就会发现分析路线已经断了, 而该类中还有很多on_xxx的方法, 它们也没有被其他方法调用。
这是因为在初始化HTTP协议解析器的时候,uvicorn.protocol把自己的实例传入了HTTP解析器中, 解析器会边接收数据边按照url, header, body来顺序解析, 并在执行每种数据解析后, 会通过回调告诉传入的实例, uvicorn正是通过on_xxx方法来监听这些回调并处理解析完的HTTP数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class HttpToolsProtocol(asyncio.Protocol):
def on_url(self, url):
"""这是收到一个请求后的第一次解析, 可以认为是该请求体的初始化, 此时会根据url和连接数据进行初始化, 并存放在实例的scope中"""
method = self.parser.get_method()
parsed_url = httptools.parse_url(url)
raw_path = parsed_url.path
path = raw_path.decode("ascii")
if "%" in path:
path = urllib.parse.unquote(path)
self.url = url
self.expect_100_continue = False
self.headers = []
self.scope = {
"type": "http",
"asgi": {"version": self.config.asgi_version, "spec_version": "2.1"},
"http_version": "1.1",
"server": self.server,
"client": self.client,
"scheme": self.scheme,
"method": method.decode("ascii"),
"root_path": self.root_path,
"path": path,
"raw_path": raw_path,
"query_string": parsed_url.query if parsed_url.query else b"",
"headers": self.headers,
}

def on_header(self, name: bytes, value: bytes):
"""解析器在解析header时, 是按照header一行一行进行解析的, 所以每即系一行header都会调用一次on_header, 并把他们存在实例的headers中"""
name = name.lower()
if name == b"expect" and value.lower() == b"100-continue":
self.expect_100_continue = True
self.headers.append((name, value))

def on_headers_complete(self):
"""对于大部分前置web框架来说, 一般解析到header后就结束不再解析了, 会开始发送到正真处理的应用程序, uvicorn也是这样的"""
http_version = self.parser.get_http_version()
if http_version != "1.1":
self.scope["http_version"] = http_version
if self.parser.should_upgrade():
# 如果发现当前http版本更加高级(比如websocket), 则不再处理, 在另外一个逻辑会转到websocket处理
return

# Handle 503 responses when 'limit_concurrency' is exceeded.
if self.limit_concurrency is not None and (
len(self.connections) >= self.limit_concurrency
or len(self.tasks) >= self.limit_concurrency
):
# 当前并发数过高, 不再转发给后面的应用程序, 直接返回错误, 这里是一个具有ASGI标准函数签名的函数, 里面实现的功能是发送错误信息到socket
app = service_unavailable
message = "Exceeded concurrency limit."
self.logger.warning(message)
else:
app = self.app

# cycle相当于一个request的处理流程
# 普通的HTTP请求只对应一个cycle就可以了, 这里是兼容Pipelined HTTP请求
existing_cycle = self.cycle
self.cycle = RequestResponseCycle(
scope=self.scope,
transport=self.transport,
flow=self.flow,
logger=self.logger,
access_logger=self.access_logger,
access_log=self.access_log,
default_headers=self.default_headers,
message_event=asyncio.Event(),
expect_100_continue=self.expect_100_continue,
keep_alive=http_version != "1.0",
on_response=self.on_response_complete,
)
if existing_cycle is None or existing_cycle.response_complete:
# 如果上个请求已经处理完了, 则开始处理这个请求(通过run_asgi来运行)
task = self.loop.create_task(self.cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
else:
# 如果上个请求没有处理完, 就先暂停读取数据, 并把该cycle放到pipeline暂存
self.flow.pause_reading()
self.pipeline.insert(0, (self.cycle, app))

def on_body(self, body: bytes):
"""读取到原生的body字节, 如果ASGI处理者还在运行, 且不是websocket, 则转给ASGI处理者
注: 一个请求可能会触发多次on_body"""
if self.parser.should_upgrade() or self.cycle.response_complete:
return
self.cycle.body += body
if len(self.cycle.body) > HIGH_WATER_LIMIT:
# 由于ASGI应用程序会根据调用者需要才来获取body(比如starlette的 await request.body()), 如果应用程序没有需要则会暂缓获取body数据
self.flow.pause_reading()
# 告诉ASGI应用程序, body已经获取结束(通常在cycle的more_body为False的时候, 才会检查message_event)
self.cycle.message_event.set()

def on_message_complete(self):
if self.parser.should_upgrade() or self.cycle.response_complete:
return
# 表示body已经读取结束了
self.cycle.more_body = False
self.cycle.message_event.set()

def on_response_complete(self):
"""返回响应时的回调"""
self.server_state.total_requests += 1

if self.transport.is_closing():
return

# 设置一个keep_alive的机制, 服务端返回响应后会设置一个倒计时future, 该future只有在上面data_received收到请求的时候才会取消
# 如果该future没有取消, 则会调用timeout_keep_alive_handler函数来关闭transport通道
self._unset_keepalive_if_required()

self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
)

# 恢复读取数据
self.flow.resume_reading()

if self.pipeline:
# 如果是pipeline请求, 则开始处理刚才暂存的cycle
cycle, app = self.pipeline.pop()
task = self.loop.create_task(cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)

在了解解析HTTP数据的时候, 经常会遇到一个cycle对象, 这个对象是基于ASGI负责读写数据转换的对象, 这个对象有sendreceive两个方法, 这两个方法的命名是站在ASGI应用程序的角度来命名的。
其中send通过传入的参数message获取到ASGI应用程序返回的数据, 并依据ASGI协议进行解析, 并拼接成HTTP协议的字节流, 当ASGI应用程序发送结束标记时, send会把拼接的字节流通过socket返回给客户端, 同时触发on_response_complete方法。
receive比较简单, 它只负责接收获取到已经解析完成的HTTP数据(早前面on_xxx时会把数据传给cycle), 然后发送到ASGI应用程序中。
这两个方法都是通过on_headers_complete中执行的run_asgi方法来调用的, 通过该方法, uvicorn会把数据的处理权转给ASGI应用程序, 如果ASGI应用程序处理异常, 则会返回HTTP状态码为500的响应给客户端并关闭transport。

分析完了cycle对象后, 再次回到protocol的data_received的方法中, 这里通过获取httptools.HttpParserUpgrade异常的方式得知当前可能是一个WebSocket请求, 于是进入handle_upgrade逻辑, 这个逻辑会检查是否加载了解析WebSocket解析器以及请求体是否满足WebSocket条件, 如果不满足就会返回一个响应体告诉客户端当前无法支持该HTTP请求的升级协议, 如果满足则会生成WebSocketProtocols来处理socket的数据, 并把它设置为当前的transport向关联, 不过这个WebsocketProtocols基本上是HTTP protocols和cycle两个对象的融合, 具体处理步骤也差不多, 这里就不多做描述了。

至此, 大体上的uvicorn.protocols源码就分析完了, 由于uvicorn是把asyncio.Protocol, 解析器, cycle三者结合在一起, 所以分析起来要经常跳转, 因此我把他们的流程转成如下的图:
流处理图

3.总结

至此, uvicorn的核心流程已经分析完了, 它先是通过server来启动一个服务, 并管理服务状态, 然后再通过protocol负责做双端的序列化, 使ASGI应用程序能够按照ASGI协议读写数据, 其中protocol还融合了HTTP解析器解析HTTP并通过它来解析数据。
当然, 除了上述主流程外, uvicorn还包括了中间件, 多进程启动以及监控文件变化重启服务等组件, 这些组件的代码量不大, 分析源码也说不出啥, 这里就简单略过。

查看评论