前记 接触开发以来发现很多连锁故障的场景的一个常用问题都是多端调用时,服务端正在消耗处理的时间过长或者网络传输异常导致服务端无法及时响应, 造成客户端一直等待,无法释放当前请求响应导致的, 而这种方法可以通过超时机制来进行解决。
超时机制, 是一个简单又方便的控制网络请求异常的一种方法, 它可以保证服务稳定(本质是快速失败), 良好的超时控制策略可以尽快的释放高延迟的请求,避免请求堆积, 而设计不合理的超时会导致整个服务架构出问题。
1.常见超时机制的弊端 编写的服务端代码是不可能一直不会失败的, 因为它会进行网络通信, 而这个网络世界并不是完美的。 常见的客户端在进行网络请求的时候都会有一个超时机制, 以Python
中一个著名的请求库httpx
为例子, 它比常用的requests
更优秀, 也支持通过timeout实现超时机制, 在使用时如下:
1 2 3 4 5 import asyncioimport httpx asyncio.run(httpx.AsyncClient().get(url="http://so1n.me" , timeout=9 ))
可以看到这个使用方法非常的简单, 也通俗易懂, 但是这个方法在使用起来会有一个弊端。 假设现在有一个方法demo
, 它总的超时时间为9秒, 但是需要请求两次, 如果还是按照原来那么写, 会十分糟糕, 代码如下:
1 2 3 4 5 6 7 import asyncioimport httpxasync def demo () -> None : await httpx.AsyncClient().get(url="http://so1n.me" , timeout=9 ) await httpx.AsyncClient().get(url="http://so1n.me" , timeout=9 )
这种情况下假设该方法的每个请求时长为8秒, 那么他的总请求时长为16秒, 已经超出要求的总的超时时长为9秒的要求的, 但每个请求都没有触发超时机制。 可以看出, 超时是简单易懂的, 但是在某些情况下它并不能很好的胜任工作。
不过在超时无法胜任某些工作时, 我们可以换个思路, 超时的原本意思是, 在n秒后中断此次请求, 也就是在某个时刻时终止请求, 那么代码可以改写如下:
1 2 3 4 5 6 7 8 import asyncioimport httpxasync def demo (timeout: int = 9 ) -> None : deadline: float = time.time() + 9 await httpx.AsyncClient().get(url="http://so1n.me" , timeout=time.time() - deadline) await httpx.AsyncClient().get(url="http://so1n.me" , timeout=time.time() - deadline)
这段代码可以完美的工作, 假设第一个请求的时长为5秒, 那么第二次请求的超时参数的值会是4秒, 这是非常ok, 代码也依然保持简单。 不过目前还是有个缺点, 就是每次都要手写一遍, 然后显示传进去, 这个超时是不可传递的, 如果有一个抽象能方便的使用, 那是非常好的。
2.可传递的超时对象–deadline 从上面的例子可以看出, 我们正真需要的是在某片代码范围内(如上面就是在一个demo
函数里面), 所有的函数调用共享一个截止时间, 当抵达截止时间时, 无论执行到那个函数调用, 都会触发超时异常。 通常如果要管理一个代码范围, 通常都是写一个函数调用, 并在外部使用超时控制, 代码则变成如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioimport httpximport timeasync def sub_demo () -> None : await httpx.AsyncClient().get(url="http://so1n.me" ) await httpx.AsyncClient().get(url="http://so1n.me" )async def demo () -> None : await asyncio.wait_for(sub_demo(), timeout=9 ) asyncio.run(demo())
但是, 这样的实现总觉得会差点意思, 每有一个共享截止时间的代码范围, 就需要写一个函数出来, 会觉得写出来的代码不是特别的优雅, 同时如果需要传的参数比较多, 那简直就是灾难了。好在Python
提供了with
语句, 凡是在with
语句裹着的, 都属于该代码范围里面, 所以一个deadline抽象的使用会变为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioimport httpximport timeasync def demo (timeout: int = 9 ) -> None : with Deadline(timeout=9 ): await httpx.AsyncClient().get(url="http://so1n.me" ) await httpx.AsyncClient().get(url="http://so1n.me" )async def bad_demo (timeout: int = 9 ) -> None : with Deadline(timeout=9 ) as d: await httpx.AsyncClient().get(url="http://so1n.me" , timeout=d.timeout) await httpx.AsyncClient().get(url="http://so1n.me" , timeout=d.timeout)
可以看到请求的get
调用能享用到demo
作用域下的所有参数, 但是bad_demo
的实现还是回到一开始的每次调用都要传参的问题。可以看到demo
非常的优雅, 实现也非常方便, 只是这里deadline
与get调用是没有任何交互的, 不清楚它是如何去终止这些超时请求, 也就是超时的机制变为隐形了。
在经过查阅资料后, 发现了Python
协程的两个方法: -1.在event loop
运行中可以通过asyncio.current_task
来获取当前正在运行的协程。 -2.在对某个协程发起cancel
时, 会递归到该协程的正在运行的子协程, 然后抛出Cancel
的错误(认为demo
是with
捕获的协程, 两次请求是demo
的子协程)
那么可以在通过with
语句捕获当前的协程, 并存放在对应的内存区域中, 并启动通知event loop
在n秒后执行取消捕获的协程, 然后就把控制权转给使用者。 这样当使用者的代码在指定时间内没完成时, 就会马上抛出超时异常, 以下是我的Deadline
的抽象实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 class Deadline (object ): def __init__ ( self, delay: Optional[float ], loop: Optional[asyncio.AbstractEventLoop] = None , timeout_exc: Optional[Exception] = None , ): self._delay: Optional[float ] = delay self._loop = loop or get_event_loop() self._timeout_exc: Exception = timeout_exc or asyncio.TimeoutError() self._is_active: bool = False self._deadline_future: asyncio.Future = asyncio.Future() self._with_scope_future: Optional[asyncio.Future] = None if self._delay is not None : self._end_timestamp: Optional[float ] = time.time() + self._delay self._end_loop_time: Optional[float ] = self._loop.time() + self._delay self._loop.call_at(self._end_loop_time, self._set_deadline_future_result) else : self._end_timestamp = None self._end_loop_time = None def _set_deadline_future_result (self ) -> None : self._deadline_future.set_result(True ) if self._with_scope_future and not self._with_scope_future.cancelled(): self._with_scope_future.cancel() def __enter__ (self ) -> "Deadline": if self._with_scope_future: raise RuntimeError("`with` can only be called once" ) if self._delay is not None : main_task: Optional[asyncio.Task] = current_task(self._loop) if not main_task: raise RuntimeError("Can not found current task" ) self._with_scope_future = main_task return self def __exit__ ( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> Optional[bool]: if self._with_scope_future: self._with_scope_future = None else : return None if self._deadline_future.done(): if exc_type: if isinstance (self._timeout_exc, IgnoreDeadlineTimeoutExc): return True raise self._timeout_exc else : return None
可以看到, Deadline
的实现十分简单, 但是还会遇到层层调用传递的情况, 如果觉得显示传递的很烦的话, 还可以使用contextvars
模块进行封装从而可以隐式调用, 只需要改造__enter__
和__exit__
方法即可, 具体看完整版代码。
Note: contextvars
模块使用具体见如何使用contextvars模块和源码分析
3.服务间的传递 到了微服务时, 超时的影响更加严重, 因为在微服务架构里面, 一次请求可能要经过一个很长的链路,跨多个服务调用后才能返回结果, 如果能提前触发超时机制的话, 则可以省下一些不必要的后续调用, 减少机器的计算和网络开销, 防止服务雪崩等问题。 但是超时用不好时, 也会带来一些副作用, 我在编写rap
框架时, 为了支持更快的请求和channel
双通道机制, 使用了单连接复用的思路, 客户端在发送请求抵达服务端请求后, 就一直在等待服务端返回数据, 当服务端超时返回数据时, 客户端会对本次请求抛出超时异常, 但是不会关闭连接, 客户端伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 future_dict = dict ()async def response (): while True : try : msg_id, header, status_code, body = await conn.read() except ValueError: return if msg_id in future_dict: future_dict[msg_id].set_result(Response(msg_id, header, status_code, body))async def request (): request = (msg_id, header, body) try : future_dict[msg_id] = asyncio.Future() try : await conn.write(request) except Exception as e: raise e try : return await asyncio.wait_for(future_dict[msg_id], 9 ) except asyncio.TimeoutError: raise asyncio.TimeoutError(f"msg_id:{msg_id} request timeout" ) finally : future_dict.pop(msg_id, None )
可以发现在这种情况会忽略几个问题:
1.客户端已经丢弃请求了, 但服务端可能还在处理数据, 并在完成的时候返回数据, 但是这时候服务端返回的数据会被客户端丢弃的。
2.如果客户端带有重试机制, 那么客户端会继续发送请求,此时服务端除了会同时多处理多个问题外, 在接口没做幂等性处理时还会出现脏数据的问题。
3.如果客户端带有重试机制, 客户端会发送多个请求到服务端,服务端的压力会变大, 导致整个服务链路异常进而造成服务雪崩。
可以发现, 客户端在这种情况下发生超时时, 除了自己抛异常外, 还要通知服务端进行超时处理, 而不是让服务端继续处理下去, 从而影响服务端性能, 但是通知服务端超时的请求也可能发生异常导致通知失败, 也会占用部分网络资源, 所以最好的方法是把截止时间跟随请求传到服务端(如果是一个超时参数, 则传递参数后, 服务端无法知道真实所剩的超时时间), 服务端通过截止时间来判断何时结束这个调用, 这种方法很简单, 在rap
中就是这样处理的, 首先是客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 async def _base_request (self, request: Request, conn: Connection ) -> Response: if not request.correlation_id: request.correlation_id = str (async_get_snowflake_id()) resp_future_id: str = f"{conn.sock_tuple} :{request.correlation_id} " try : response_future: asyncio.Future[Response] = asyncio.Future() self._resp_future_dict[resp_future_id] = response_future deadline: Optional[Deadline] = deadline_context.get() if self.app.through_deadline and deadline: request.header["X-rap-deadline" ] = deadline.end_timestamp await self.write_to_conn(request, conn) response: Response = await as_first_completed( [response_future], not_cancel_future_list=[conn.conn_future], ) response.state = request.state return response finally : pop_future: Optional[asyncio.Future] = self._resp_future_dict.pop(resp_future_id, None ) if pop_future: safe_del_future(pop_future)
然后是服务端, 服务端也非常简单, 直接从Header获取值, 再应用到超时函数即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 try : deadline_timestamp: int = request.header.get("X-rap-deadline" , 0 ) if deadline_timestamp: timeout: int = int (time.time() - deadline_timestamp) else : timeout = self._run_timeout result: Any = await asyncio.wait_for(coroutine, timeout)except asyncio.TimeoutError: return call_id, RpcRunTimeError(f"Call {func_model.func.__name__} timeout" )except Exception as e: return call_id, e
这样即可完成跨进程跨服务的超时传递, 不过这种实现是有一个前提, 就是所有机器的时间得保持一致的(或者说只差一点点), 而微服务基本上都满足这种前提, 他们基本上都在同一个内网里面, 或者同个公司管理的跨区域机器里面。