前记 上一篇分析了uvicorn
, 但是uvicorn
只是一个ASGI容器, 真正处理请求的是ASGI应用程序,而starlette
是最出名也是最标准的ASGI应用程序, 通过了解starlette
, 可以了解到每个组件都是ASGI APP的设计理念, 了解ASGI APP的兼容性, 能更完整的理解ASGI生态。
NOTE: 使用了几年的starlette
以来, 简单了翻过了几次源码, 觉得starlette
堪称工艺品, 设计很完美, 各种逻辑实现起来很简单(也可能是我一开始就使用了sanic框架), 从使用至今, 除了初始化中间件, 在中间件读取body以及官方示例文档比较少这些缺点外, 感觉不出有其他的槽点。
NOTE: 本文偏代码+注释比较多
1.starlette的应用 在之前的文章 了解过, Uvicron
通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn
发送和接收信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def app (scope, receive, send ): assert scope['type' ] == 'http' await send({ 'type' : 'http.response.start' , 'status' : 200 , 'headers' : [ [b'content-type' , b'text/plain' ], ] }) await send({ 'type' : 'http.response.body' , 'body' : b'Hello, world!' , })if __name__ == "__main__" : import uvicorn uvicorn.run(app, host="127.0.0.1" , port=5000 , log_level="info" )
而使用uvicorn
启动starlette
的方式是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from starlette.applications import Starlettefrom starlette.middleware.gzip import GZipMiddleware app: Starlette = Starlette()@app.route("/" ) def demo_route () -> None : pass @app.websocket_route("/" ) def demo_websocket_route () -> None : pass @app.add_exception_handlers(404 ) def not_found_route () -> None : pass @app.on_event("startup" ) def startup_event_demo () -> None : pass @app.on_event("shutdown" ) def shutdown_event_demo () -> None : pass app.add_middleware(GZipMiddleware)if __name__ == "__main__" : import uvicorn uvicorn.run(app, host="127.0.0.1" , port=5000 )
这段代码Starlette
先执行了初始化, 然后注册路由,异常处理, 事件,中间件到自身, 然后传给uvicorn.run
, uvicorn.run
通过调用starlette
的__call__
的方法传递请求数据。
简单的了解完启动后, 先从starlette
初始化看是分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class Starlette : def __init__ ( self, debug: bool = False , routes: typing.Sequence[BaseRoute] = None , middleware: typing.Sequence[Middleware] = None , exception_handlers: typing.Dict[ typing.Union[int , typing.Type[Exception]], typing.Callable ] = None , on_startup: typing.Sequence[typing.Callable] = None , on_shutdown: typing.Sequence[typing.Callable] = None , lifespan: typing.Callable[["Starlette" ], typing.AsyncGenerator] = None , ) -> None : """ :param debug: 决定是否启用debug功能 :param route: 一个路由列表, 提供HTTP和WebSocket服务. :param middleware: 中间件列表, 应用于每个请求 :param exception_handler: 存放异常回调的字典, 键为HTTP状态码, 值为回调函数 :on_startup: 启动时调用的回调函数 :on_shutdown: 关闭时的回调函数 :lifespan: ASGI中的lifespan功能 """ assert lifespan is None or ( on_startup is None and on_shutdown is None ), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both." self._debug = debug self.state = State() self.router = Router( routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan ) self.exception_handlers = ( {} if exception_handlers is None else dict (exception_handlers) ) self.user_middleware = [] if middleware is None else list (middleware) self.middleware_stack = self.build_middleware_stack()
通过代码可以看到初始化这里已经满足了大多数功能了, 不过还有一个构建中间件的函数, 需要进一步分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class Starlette : def build_middleware_stack (self ) -> ASGIApp: debug = self.debug error_handler = None exception_handlers = {} for key, value in self.exception_handlers.items(): if key in (500 , Exception): error_handler = value else : exception_handlers[key] = value middleware = ( [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug)] + self.user_middleware + [ Middleware( ExceptionMiddleware, handlers=exception_handlers, debug=debug ) ] ) app = self.router for cls, options in reversed (middleware): app = cls(app=app, **options) return app
构建完中间件后, 初始化就算完成了, 接着就会通过uvicorn.run
方法从而调用到__call__
方法:
1 2 3 4 class Starlette : async def __call__ (self, scope: Scope, receive: Receive, send: Send ) -> None : scope["app" ] = self await self.middleware_stack(scope, receive, send)
这个方法很简单, 就是通过scope, 把app设置到请求流程中, 方便后续调用, 然后通过调用middleware_stack
开始请求的处理。 通过这个方法和中间件的初始化可以看出, starlette
中的中间件本身也是一个ASGI APP(也可以看出route是一个ASGI APP, 处于调用栈的最后一层), 同时starlette
也把异常的处理也交给了中间件处理, 这在其他的Web应用框架很少见到, 可以看出starlette
的设计是每个组件都尽量是ASGI APP。
虽然starlette
中间件的设计是非常不错的, 但是它的这种初始化方式我不太喜欢, 因为在编写的时候IDE无法帮你传入的参数做校验, 比如上面示例的GZip
中间件, 你知道需要传minimum_size
参数, 但是你有可能打错, 只是没到运行时的时候, 压根不知道它是否有异常:
1 app.add_middleware(GZipMiddleware, minimum_size = 500 )
我在设计我的rpc框架rap 时也参考了startlette
的中间件设计, 但是在这一块进行了优化, 不过与本篇文章关系不大, 有兴趣可以参考:https://github.com/so1n/rap/blob/master/rap/server/plugin/middleware/base.py
2.中间件 上面说到, 在startlette
中, 中间件是一个ASGI APP, 所以在startlette
的所有中间件都必定是一个满足如下形式的类:
1 2 3 4 5 6 class BaseMiddleware : def __init__ (self, app: ASGIApp ) -> None : pass async def __call__ (self, scope: Scope, receive: Receive, send: Send ) -> None : pass
在starlette.middleware
中, 有很多的中间件实现, 他们都满足这一点, 不过本章节不会讲所有的中间件, 只会挑选几个有代表性的中间件从最靠近Route
到远进行分析。
2.1.异常处理中间件-ExceptionMiddleware 第一个就是ExceptionMiddleware中间件, 这个中间件用户是不会直接接触到的(所以没有放在starlette.middleware
里面), 而是通过下面的这个方法间接的接触到:
1 2 @app.app_exception_handlers(404 ) def not_found_route () -> None : pass
当用户使用这个方法时, startlette
会把回调函数挂在对应的字典里面, 这个字典以HTTP状态码为key, 回调函数为value。 当ExceptionMiddleware发现Route
请求处理异常时, 可以通过异常的响应HTTP状态码找到对应的回调函数, 并把请求和异常传给用户挂载的对应的回调函数, 最后把用户的回调函数结果抛回上一个ASGI APP。 此外ExceptionMiddleware还支持异常注册, 当Route
抛出的异常与注册的异常匹配时, 调用该异常注册的对应的回调函数。 该类的源码和注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 class ExceptionMiddleware : def __init__ ( self, app: ASGIApp, handlers: dict = None , debug: bool = False ) -> None : self.app = app self.debug = debug self._status_handlers = {} self._exception_handlers = { HTTPException: self.http_exception } if handlers is not None : for key, value in handlers.items(): self.add_exception_handler(key, value) def add_exception_handler ( self, exc_class_or_status_code: typing.Union[int , typing.Type[Exception]], handler: typing.Callable, ) -> None : if isinstance (exc_class_or_status_code, int ): self._status_handlers[exc_class_or_status_code] = handler else : assert issubclass (exc_class_or_status_code, Exception) self._exception_handlers[exc_class_or_status_code] = handler def _lookup_exception_handler ( self, exc: Exception ) -> typing.Optional[typing.Callable]: for cls in type (exc).__mro__: if cls in self._exception_handlers: return self._exception_handlers[cls] return None async def __call__ (self, scope: Scope, receive: Receive, send: Send ) -> None : if scope["type" ] != "http" : await self.app(scope, receive, send) return response_started = False async def sender (message: Message ) -> None : nonlocal response_started if message["type" ] == "http.response.start" : response_started = True await send(message) try : await self.app(scope, receive, sender) except Exception as exc: handler = None if isinstance (exc, HTTPException): handler = self._status_handlers.get(exc.status_code) if handler is None : handler = self._lookup_exception_handler(exc) if handler is None : raise exc from None if response_started: msg = "Caught handled exception, but response already started." raise RuntimeError(msg) from exc request = Request(scope, receive=receive) if asyncio.iscoroutinefunction(handler): response = await handler(request, exc) else : response = await run_in_threadpool(handler, request, exc) await response(scope, receive, sender)
2.2.用户中间件 接着就是用户中间件了, 这个也是我们接触最多的中间件, 在使用starlette.middleware
时, 我们都会继承于一个叫BaseHTTPMiddleware
的中间件, 然后基于如下代码进行拓展:
1 2 3 4 5 6 7 8 9 10 11 12 class DemoMiddleware (BaseHTTPMiddleware ): def __init__ ( self, app: ASGIApp, ) -> None : super (DemoMiddleware, self).__init__(app) async def dispatch (self, request: Request, call_next: RequestResponseEndpoint ) -> Response: response: Response = await call_next(request) return response
如果在请求之前进行预处理, 就在before块编写相关代码,如果要在请求之后进行处理的, 就在after块编写代码, 使用非常简单, 而且他们是处于同一个作用域的, 这就意味着该方法里面的变量不用通过上下文或者动态变量来传播(如果你接触了Django或者Flask的类中间件实现, 也就懂得了starlette这种实现的优雅)。
接下来就来看看它是怎么实现的, 代码非常简单, 大概60行左右, 不过我注释写了很多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 class BaseHTTPMiddleware : def __init__ (self, app: ASGIApp, dispatch: DispatchFunction = None ) -> None : self.app = app self.dispatch_func = self.dispatch if dispatch is None else dispatch async def __call__ (self, scope: Scope, receive: Receive, send: Send ) -> None : """ ASGI 标准的函数签名函数, 代表着ASGI的请求会从这里进来 """ if scope["type" ] != "http" : await self.app(scope, receive, send) return request = Request(scope, receive=receive) response = await self.dispatch_func(request, self.call_next) await response(scope, receive, send) async def call_next (self, request: Request ) -> Response: loop = asyncio.get_event_loop() queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue() scope = request.scope receive = request.receive send = queue.put async def coro () -> None : try : await self.app(scope, receive, send) finally : await queue.put(None ) task = loop.create_task(coro()) message = await queue.get() if message is None : task.result() raise RuntimeError("No response returned." ) assert message["type" ] == "http.response.start" async def body_stream () -> typing.AsyncGenerator[bytes, None ]: while True : message = await queue.get() if message is None : break assert message["type" ] == "http.response.body" yield message.get("body" , b"" ) task.result() response = StreamingResponse( status_code=message["status" ], content=body_stream() ) response.raw_headers = message["headers" ] return response async def dispatch ( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: raise NotImplementedError()
2.3.ServerErrorMiddleware ServerErrorMiddleware跟ExceptionMiddleware很像(所以这一part也不做更多的说明), 整个逻辑基本上都是一致的, 不过ExceptionMiddleware负责的是把用户的路由异常进行捕获处理, ServerErrorMiddleware主要负责是做兜底, 确保返回的一定是合法的HTTP响应。
ServerErrorMiddleware的间接调用函数也跟ExceptionMiddleware一样, 不过只有注册的HTTP状态码为500时, 才会把回调注册到ServerErrorMiddleware中:
1 2 @app.exception_handlers(500 ) def not_found_route () -> None : pass
ServerErrorMiddleware是处于ASGI APP中的最顶级, 它负责异常兜底的工作, 它要做的事情很简单, 如果下一级ASGI APP处理发生异常, 就进入兜底逻辑:
1.如果启用debug, 则返回debug页面
2.如果有注册回调, 则执行注册回调
3.如果都没则返回500响应
3.Route 在starlette
中, Route
分为两部分, 一部分我把它称为Real App
的Router
, 它处于中间件的下一层级, 但它负责的是Starlette
除中间件外的所有事情, 主要包括路由查找匹配, APP启动关闭处理等, 另外一部分则是注册到Router
的路由。
3.1.Router Router
很简单, 他的主要责任就是装载路由和匹配路由, 以下是除装载路由外的源码和注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 class Router : def __init__ ( self, routes: typing.Sequence[BaseRoute] = None , redirect_slashes: bool = True , default: ASGIApp = None , on_startup: typing.Sequence[typing.Callable] = None , on_shutdown: typing.Sequence[typing.Callable] = None , lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None , ) -> None : self.routes = [] if routes is None else list (routes) self.redirect_slashes = redirect_slashes self.default = self.not_found if default is None else default self.on_startup = [] if on_startup is None else list (on_startup) self.on_shutdown = [] if on_shutdown is None else list (on_shutdown) async def default_lifespan (app: typing.Any ) -> typing.AsyncGenerator: await self.startup() yield await self.shutdown() self.lifespan_context = default_lifespan if lifespan is None else lifespan async def not_found (self, scope: Scope, receive: Receive, send: Send ) -> None : """匹配不到路由执行的逻辑""" if scope["type" ] == "websocket" : websocket_close = WebSocketClose() await websocket_close(scope, receive, send) return if "app" in scope: raise HTTPException(status_code=404 ) else : response = PlainTextResponse("Not Found" , status_code=404 ) await response(scope, receive, send) async def lifespan (self, scope: Scope, receive: Receive, send: Send ) -> None : """ Handle ASGI lifespan messages, which allows us to manage application startup and shutdown events. """ first = True app = scope.get("app" ) await receive() try : if inspect.isasyncgenfunction(self.lifespan_context): async for item in self.lifespan_context(app): assert first, "Lifespan context yielded multiple times." first = False await send({"type" : "lifespan.startup.complete" }) await receive() else : for item in self.lifespan_context(app): assert first, "Lifespan context yielded multiple times." first = False await send({"type" : "lifespan.startup.complete" }) await receive() except BaseException: if first: exc_text = traceback.format_exc() await send({"type" : "lifespan.startup.failed" , "message" : exc_text}) raise else : await send({"type" : "lifespan.shutdown.complete" }) async def __call__ (self, scope: Scope, receive: Receive, send: Send ) -> None : """ The main entry point to the Router class. """ assert scope["type" ] in ("http" , "websocket" , "lifespan" ) if "router" not in scope: scope["router" ] = self if scope["type" ] == "lifespan" : await self.lifespan(scope, receive, send) return partial = None for route in self.routes: match, child_scope = route.matches(scope) if match == Match.FULL: scope.update(child_scope) await route.handle(scope, receive, send) return elif match == Match.PARTIAL and partial is None : partial = route partial_scope = child_scope if partial is not None : scope.update(partial_scope) await partial.handle(scope, receive, send) return if scope["type" ] == "http" and self.redirect_slashes and scope["path" ] != "/" : redirect_scope = dict (scope) if scope["path" ].endswith("/" ): redirect_scope["path" ] = redirect_scope["path" ].rstrip("/" ) else : redirect_scope["path" ] = redirect_scope["path" ] + "/" for route in self.routes: match, child_scope = route.matches(redirect_scope) if match != Match.NONE: redirect_url = URL(scope=redirect_scope) response = RedirectResponse(url=str (redirect_url)) await response(scope, receive, send) return await self.default(scope, receive, send)
可以看出Router
的代码非常的简单, 主要的代码都集中在__call__
中, 但是在这里出现了多次遍历查询路由且每个路由都是执行一遍正则表达式来判断是否匹配。可能会有人觉得这样的执行速度会很慢, 我曾经也觉得这样的路由查找很慢, 然后就实现了一个路由树来代替它详见route_trie.py , 然而在我实现后做了一次性能测试, 发现在路由没超过50个的情况下, 循环匹配性能是优于路由树的, 在没超过100条的情况下, 两者是相当的, 而在正常情况下, 我们指定的路由都不会超过100个, 所以不用去担心这部分路由的匹配性能, 如果还是很担心, 那么可以使用Mount
来对路由进行分组, 使匹配的次数减少。
3.2.其他Route Moute
是继承于BaseRoute
, 其它的Route
, HostRoute
, WebsocketRoute
也是一样继承于BaseRoute
, 它们提供的方法都差不多, 只是具体实现略有差别而已(主要是初始化,路由匹配和反向查找略有区别), 我们先来看看BaseRoute
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class BaseRoute : def matches (self, scope: Scope ) -> typing.Tuple[Match, Scope]: raise NotImplementedError() def url_path_for (self, name: str , **path_params: str ) -> URLPath: raise NotImplementedError() async def handle (self, scope: Scope, receive: Receive, send: Send ) -> None : raise NotImplementedError() async def __call__ (self, scope: Scope, receive: Receive, send: Send ) -> None : """ A route may be used in isolation as a stand-alone ASGI app. This is a somewhat contrived case, as they'll almost always be used within a Router, but could be useful for some tooling and minimal apps. """ match, child_scope = self.matches(scope) if match == Match.NONE: if scope["type" ] == "http" : response = PlainTextResponse("Not Found" , status_code=404 ) await response(scope, receive, send) elif scope["type" ] == "websocket" : websocket_close = WebSocketClose() await websocket_close(scope, receive, send) return scope.update(child_scope) await self.handle(scope, receive, send)
可以看到BaseRoute
提供的功能不多, 其他的路由则是基于BaseRoute
进行拓展:
Route: 标准的HTTP路由, 负责通过HTTP URL和HTTP Method进行路由匹配, 然后提供调用HTTP路由的方法
WebSocketRoute: 标准的WebSocketRoute, 根据HTTP URL进行路由匹配, 然后通过starlette.websocket
的WebSocket生成session再传入对应的函数
Mount: 一个路由的套娃封装, 他的匹配方法是URL的前缀匹配, 把请求转发给符合规则的下一级ASGI APP, 当他的下一级ASGI APP是Router
时, 他的调用链可能会像这样Router->Mount->Router->Mount->Router->Route
, 通过使用Mount可以对路由进行分组, 同时也能加快匹配速度, 推荐使用。 不过, 它还支持把请求分发给其他ASGI APP, 也可以做到如Starlette->ASGI Middleware->Mount->Other Starlette->...
Host: 它会根据用户请求的Host分发到对应的ASGI APP, 可以选择Route
, Mount
, 中间件等等ASGI APP
4.其它组件 从上面可以看到, starlette
中的组件基本上都设计成ASGI APP, 可以任意的兼容, 这种设计是非常棒的, 虽然会牺牲一点点性能, 但是它的兼容性非常的强, 而其他的组件也都或多或少的设计得像ASGI APP一样, 在介绍其他组件之前, 先看看整个starlette
的整个项目结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ├── middleware ├── applications.py ├── authentication.py ├── background.py ├── concurrency.py ├── config.py ├── convertors.py ├── datastructures.py ├── endpoints.py ├── exceptions.py ├── formparsers.py ├── graphql.py ├── __init__.py ├── py.typed ├── requests.py ├── responses.py ├── routing.py ├── schemas.py ├── staticfiles.py ├── status.py ├── templating.py ├── testclient.py ├── types.py └── websockets.py
上面的文件有很多, 有些比较简单就直接略过。
4.1.Request Request
非常的简单, 它继承于HttpConnection
, 这个类主要是通过ASGI协议传过来的Scope进行解析, 提取如url, method等信息, 而Request
增加了读取请求数据和返回数据(HTTP1.1支持服务端push数据给客户端)的功能, 其中, 读取数据都依赖于一个核心函数–stram
,它的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 async def stream (self ) -> typing.AsyncGenerator[bytes, None ]: if hasattr (self, "_body" ): yield self._body yield b"" return if self._stream_consumed: raise RuntimeError("Stream consumed" ) self._stream_consumed = True while True : message = await self._receive() if message["type" ] == "http.request" : body = message.get("body" , b"" ) if body: yield body if not message.get("more_body" , False ): break elif message["type" ] == "http.disconnect" : self._is_disconnected = True raise ClientDisconnect() yield b""
这个实现非常简单, 但是却有一个小bug, 如果有了解Nginx
或者其他Web服务的都会知道, 一般的中间服务器是不会处理body数据的, 只做传递。ASGI也是如此, uvicorn
在处理完url和header后就开始调用ASGI APP, 并把send
和receive
对象传递下去, 这两个对象会在经过多个ASGI APP后,抵达路由这个ASGI APP, 并在函数里供用户使用,, 所以Request接收的receive
对象是uvicorn
生成的。 而receive
的数据源是源自于是一个asyncio.Queue
队列, 从中间件的分析可以知道, 每个ASGI APP都依据scope
, receive
来生成一个Request
对象, 意味着每层ASGI APP的Request
对象是不一致的, 如果在中间件调用Request
对象读取Body的话, 就会提前消费通过receive
消费了队列的数据, 导致后续的ASGI APP无法通过Request
对象读取Body数据, 该问题示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asynciofrom starlette.applications import Starlettefrom starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpointfrom starlette.requests import Requestfrom starlette.responses import JSONResponse, Response app: Starlette = Starlette()class DemoMiddleware (BaseHTTPMiddleware ): async def dispatch ( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: print(request, await request.body()) return await call_next(request) app.add_middleware(DemoMiddleware)@app.route("/" ) async def demo (request: Request ) -> JSONResponse: try : await asyncio.wait_for(request.body(), 1 ) return JSONResponse({"result" : True }) except asyncio.TimeoutError: return JSONResponse({"result" : False })if __name__ == "__main__" : import uvicorn uvicorn.run(app)
运行后执行请求查看结果:
1 2 -> curl http://127.0.0.1:8000 {"result" :false }
可以看到执行的结果是false
, 意味着执行request.body
超时了, 因为此时receive
队列已经空了, 是拿不到数据的, 如果不加超时的话这个请求就会一直卡主。 那么要怎么去解决问题呢, 先看看Request
获取是如何获取body的, 因为用户可以同时获取多次body, 但一直都是相同的数据, 它的实现思路是获取数据后, 把数据缓存到一个变量里面, 我们也可以采取这个思路, 由于数据都是通过receive
获取的, 那么可以在在读取数据后, 构造一个receive
函数, 该函数返回类似于ASGI的通信协议的数据, 并且有完整的body数据(满足Request.stream获取body的构造), 代码如下:
1 2 3 4 5 6 7 async def proxy_get_body (request: Request ) -> bytes: async def receive () -> Message: return {"type" : "http.request" , "body" : body} body = await request.body() request._receive = receive return body
之后任意层级的ASGI APP如果需要获取Body数据的话, 就可以调用该函数来获取Body数据, 同时又不影响后续的ASGI APP获取Body数据。
4.2.TestClient 在基于TestCLient
的测试用例运行时, 没有流量转发, 而是通过请求调用到路由函数, 并根据返回数据转化为一个响应对象。 同时, 它还能会自动运行on_startup
和on_shutdown
挂载的函数以及挂载的中间件, 我在一开始接触时, 我很好奇它是怎么实现的, 因为大多数的测试用例框架都很难做到直接调用到路由函数, 同时又满足于框架的其他中间件, on_startup
和on_shutdown
的功能(特别是Python的gRPC自带的测试用例封装…)。
在了解TestClient
的运行原理之前, 先看看TestClient
的使用用例如下:
1 2 3 4 5 6 7 from starlette.testclient import TestClientfrom requests import Response app: Starlette = Starlette()with TestClient(app) as client: response: Response = client.get("/" )
这段代码中, 分为几步走:
1:初始化一个app对象
2:把app对象传入TestClient
中, 并通过with
语法启动一个上下文
3:通过返回的client进行调用, 最后返回一个requests.Response
对象。
其中第一点非常简单, 我们也分析过了, 对于第二点, 很难明白为什么要with
上下文, 在官方文档说明是可以这样直接运行:
1 response: Response = TestClient(app).get("/" )
但是没办法执行on_startup
和on_shutdown
这两个事件挂载的函数, 所以初步判定with
语法与它们有关, 而至于第三步则很难猜透starlette
是怎么实现的, 但是返回的是requests.Respnose
的对象, 那么一定跟requests
这个框架有一些关联, 具体需要分析源码才能知道。
接下来就开始带着问题分析源码, 首先是类和__init__
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class TestClient (requests.Session ): __test__ = False def __init__ ( self, app: typing.Union[ASGI2App, ASGI3App], base_url: str = "http://testserver" , raise_server_exceptions: bool = True , root_path: str = "" , ) -> None : super (TestClient, self).__init__() if _is_asgi3(app): app = typing.cast(ASGI3App, app) asgi_app = app else : app = typing.cast(ASGI2App, app) asgi_app = _WrapASGI2(app) adapter = _ASGIAdapter( asgi_app, raise_server_exceptions=raise_server_exceptions, root_path=root_path, ) self.mount("http://" , adapter) self.mount("https://" , adapter) self.mount("ws://" , adapter) self.mount("wss://" , adapter) self.headers.update({"user-agent" : "testclient" }) self.app = asgi_app self.base_url = base_url
从这个可以看出, TestClient继承于requests.Session的方法, 证明可以在编写测试用例时, 直接调用到requests.Session的相关的方法。然后在__init__
方法中实例化了一个adapter
, 这里是使用了requests
的adapter机制, 通过adpater机制, 可以拦截请求的数据和响应的数据。_ASGIdapter
的代码比较多, 但是它的实现逻辑很简单, 它重载了Adapter
的send
方法, 当执行到send
方法时, 它会变成执行app(scope, receive, send)
, 其中receive
是负责把请求的数据转换为ASGI协议,供下一级ASGI APP调用。而send
(位于Adapter.send
里面的闭包函数)则获取ASGI APP返回的数据并存放到字典中, 当ASGI APP执行完毕的时候, Adapter
的send
方法会根据执行是否异常以及存放数据的字典转化为一个request.Response
的实例返回给用户。
通过_ASGIdapter
了解了starlette
是如何解决第三个问题的, 接下来是with
语法相关的__enter__
, __exit__
:
1 2 3 4 5 6 7 8 9 10 11 12 class TestClient (requests.Session ): def __enter__ (self ) -> "TestClient": loop = asyncio.get_event_loop() self.send_queue = asyncio.Queue() self.receive_queue = asyncio.Queue() self.task = loop.create_task(self.lifespan()) loop.run_until_complete(self.wait_startup()) return self def __exit__ (self, *args: typing.Any ) -> None : loop = asyncio.get_event_loop() loop.run_until_complete(self.wait_shutdown())
可以看出, 在使用进入上下文和退出上下文时, 自动调用了lifespan
方法, 然后通过lifespan
机制来实现on_startup
和on_shutdown
功能, 具体的源码和注释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class TestClient (requests.Session ): async def lifespan (self ) -> None : scope = {"type" : "lifespan" } try : await self.app(scope, self.receive_queue.get, self.send_queue.put) finally : await self.send_queue.put(None ) async def wait_startup (self ) -> None : await self.receive_queue.put({"type" : "lifespan.startup" }) message = await self.send_queue.get() if message is None : self.task.result() assert message["type" ] in ( "lifespan.startup.complete" , "lifespan.startup.failed" , ) if message["type" ] == "lifespan.startup.failed" : message = await self.send_queue.get() if message is None : self.task.result() async def wait_shutdown (self ) -> None : await self.receive_queue.put({"type" : "lifespan.shutdown" }) message = await self.send_queue.get() if message is None : self.task.result() assert message["type" ] == "lifespan.shutdown.complete" await self.task
5.总结 至此, starlette
的几个重要的功能代码都分析完了, starlette
是一个非常棒的库, 它的设计思路也是非常的棒, 建议大家自己读一遍starlette
的源代码, 对以后自己写框架是有帮助的。