前记 接触开发以来发现很多连锁故障的场景的一个常用问题都是多端调用时,服务端正在消耗处理的时间过长或者网络传输异常导致服务端无法及时响应, 造成客户端一直等待,无法释放当前请求响应导致的, 而这种方法可以通过超时机制来进行解决。
超时机制, 是一个简单又方便的控制网络请求异常的一种方法, 它可以保证服务稳定(本质是快速失败), 良好的超时控制策略可以尽快的释放高延迟的请求,避免请求堆积, 而设计不合理的超时会导致整个服务架构出问题。
        
        1.常见超时机制的弊端 编写的服务端代码是不可能一直不会失败的, 因为它会进行网络通信, 而这个网络世界并不是完美的。 常见的客户端在进行网络请求的时候都会有一个超时机制, 以Python中一个著名的请求库httpx为例子, 它比常用的requests更优秀, 也支持通过timeout实现超时机制, 在使用时如下:
1 2 3 4 5 import  asyncioimport  httpx asyncio.run(httpx.AsyncClient().get(url="http://so1n.me" , timeout=9 ))
 
可以看到这个使用方法非常的简单, 也通俗易懂, 但是这个方法在使用起来会有一个弊端。 假设现在有一个方法demo, 它总的超时时间为9秒, 但是需要请求两次, 如果还是按照原来那么写, 会十分糟糕, 代码如下:
1 2 3 4 5 6 7 import  asyncioimport  httpxasync  def  demo () -> None :     await  httpx.AsyncClient().get(url="http://so1n.me" , timeout=9 )     await  httpx.AsyncClient().get(url="http://so1n.me" , timeout=9 )
 
这种情况下假设该方法的每个请求时长为8秒, 那么他的总请求时长为16秒, 已经超出要求的总的超时时长为9秒的要求的, 但每个请求都没有触发超时机制。 可以看出, 超时是简单易懂的, 但是在某些情况下它并不能很好的胜任工作。 
不过在超时无法胜任某些工作时, 我们可以换个思路, 超时的原本意思是, 在n秒后中断此次请求, 也就是在某个时刻时终止请求, 那么代码可以改写如下:
1 2 3 4 5 6 7 8 import  asyncioimport  httpxasync  def  demo (timeout: int  = 9  ) -> None :     deadline: float  = time.time() + 9      await  httpx.AsyncClient().get(url="http://so1n.me" , timeout=time.time() - deadline)     await  httpx.AsyncClient().get(url="http://so1n.me" , timeout=time.time() - deadline)
 
这段代码可以完美的工作, 假设第一个请求的时长为5秒, 那么第二次请求的超时参数的值会是4秒, 这是非常ok, 代码也依然保持简单。 不过目前还是有个缺点, 就是每次都要手写一遍, 然后显示传进去, 这个超时是不可传递的, 如果有一个抽象能方便的使用, 那是非常好的。
2.可传递的超时对象–deadline 从上面的例子可以看出, 我们正真需要的是在某片代码范围内(如上面就是在一个demo函数里面), 所有的函数调用共享一个截止时间, 当抵达截止时间时, 无论执行到那个函数调用, 都会触发超时异常。 通常如果要管理一个代码范围, 通常都是写一个函数调用, 并在外部使用超时控制, 代码则变成如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import  asyncioimport  httpximport  timeasync  def  sub_demo () -> None :     await  httpx.AsyncClient().get(url="http://so1n.me" )     await  httpx.AsyncClient().get(url="http://so1n.me" )async  def  demo () -> None :     await  asyncio.wait_for(sub_demo(), timeout=9 ) asyncio.run(demo())
 
但是, 这样的实现总觉得会差点意思, 每有一个共享截止时间的代码范围, 就需要写一个函数出来, 会觉得写出来的代码不是特别的优雅, 同时如果需要传的参数比较多, 那简直就是灾难了。好在Python提供了with语句, 凡是在with语句裹着的, 都属于该代码范围里面, 所以一个deadline抽象的使用会变为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import  asyncioimport  httpximport  timeasync  def  demo (timeout: int  = 9  ) -> None :     with  Deadline(timeout=9 ):         await  httpx.AsyncClient().get(url="http://so1n.me" )         await  httpx.AsyncClient().get(url="http://so1n.me" )async  def  bad_demo (timeout: int  = 9  ) -> None :     with  Deadline(timeout=9 ) as  d:         await  httpx.AsyncClient().get(url="http://so1n.me" , timeout=d.timeout)         await  httpx.AsyncClient().get(url="http://so1n.me" , timeout=d.timeout)
 
可以看到请求的get调用能享用到demo作用域下的所有参数, 但是bad_demo的实现还是回到一开始的每次调用都要传参的问题。可以看到demo非常的优雅, 实现也非常方便, 只是这里deadline与get调用是没有任何交互的, 不清楚它是如何去终止这些超时请求, 也就是超时的机制变为隐形了。
在经过查阅资料后, 发现了Python协程的两个方法: -1.在event loop运行中可以通过asyncio.current_task来获取当前正在运行的协程。 -2.在对某个协程发起cancel时, 会递归到该协程的正在运行的子协程, 然后抛出Cancel的错误(认为demo是with捕获的协程, 两次请求是demo的子协程)
那么可以在通过with语句捕获当前的协程, 并存放在对应的内存区域中, 并启动通知event loop在n秒后执行取消捕获的协程, 然后就把控制权转给使用者。 这样当使用者的代码在指定时间内没完成时, 就会马上抛出超时异常, 以下是我的Deadline的抽象实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 class  Deadline (object  ):     def  __init__ (        self,         delay: Optional[float ],         loop: Optional[asyncio.AbstractEventLoop] = None ,         timeout_exc: Optional[Exception] = None ,      ):                  self._delay: Optional[float ] = delay                  self._loop = loop or  get_event_loop()                  self._timeout_exc: Exception = timeout_exc or  asyncio.TimeoutError()                  self._is_active: bool  = False                   self._deadline_future: asyncio.Future = asyncio.Future()                  self._with_scope_future: Optional[asyncio.Future] = None           if  self._delay is  not  None :                          self._end_timestamp: Optional[float ] = time.time() + self._delay             self._end_loop_time: Optional[float ] = self._loop.time() + self._delay                          self._loop.call_at(self._end_loop_time, self._set_deadline_future_result)         else :             self._end_timestamp = None              self._end_loop_time = None      def  _set_deadline_future_result (self ) -> None :                  self._deadline_future.set_result(True )         if  self._with_scope_future and  not  self._with_scope_future.cancelled():             self._with_scope_future.cancel()     def  __enter__ (self ) -> "Deadline":                  if  self._with_scope_future:                          raise  RuntimeError("`with` can only be called once" )         if  self._delay is  not  None :                                       main_task: Optional[asyncio.Task] = current_task(self._loop)             if  not  main_task:                 raise  RuntimeError("Can not found current task" )                          self._with_scope_future = main_task         return  self     def  __exit__ (        self,         exc_type: Optional[Type[BaseException]],         exc_val: Optional[BaseException],         exc_tb: Optional[TracebackType],      ) -> Optional[bool]:                  if  self._with_scope_future:             self._with_scope_future = None          else :             return  None          if  self._deadline_future.done():                          if  exc_type:                 if  isinstance (self._timeout_exc, IgnoreDeadlineTimeoutExc):                                          return  True                           raise  self._timeout_exc         else :             return  None 
 
可以看到, Deadline的实现十分简单, 但是还会遇到层层调用传递的情况, 如果觉得显示传递的很烦的话, 还可以使用contextvars模块进行封装从而可以隐式调用, 只需要改造__enter__和__exit__方法即可, 具体看完整版代码。
Note: contextvars模块使用具体见如何使用contextvars模块和源码分析 
 
3.服务间的传递 到了微服务时, 超时的影响更加严重, 因为在微服务架构里面, 一次请求可能要经过一个很长的链路,跨多个服务调用后才能返回结果, 如果能提前触发超时机制的话, 则可以省下一些不必要的后续调用, 减少机器的计算和网络开销, 防止服务雪崩等问题。 但是超时用不好时, 也会带来一些副作用, 我在编写rap框架时, 为了支持更快的请求和channel双通道机制, 使用了单连接复用的思路, 客户端在发送请求抵达服务端请求后, 就一直在等待服务端返回数据, 当服务端超时返回数据时, 客户端会对本次请求抛出超时异常, 但是不会关闭连接, 客户端伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41  future_dict = dict ()async  def  response ():          while  True :     	try :     		     	    msg_id, header, status_code, body = await  conn.read()      	except  ValueError:     	    return      	         if  msg_id in  future_dict:     		future_dict[msg_id].set_result(Response(msg_id, header, status_code, body))async  def  request ():          request = (msg_id, header, body)     try :                  future_dict[msg_id] = asyncio.Future()         try :                          await  conn.write(request)         except  Exception as  e:             raise  e         try :                          return  await  asyncio.wait_for(future_dict[msg_id], 9 )         except  asyncio.TimeoutError:             raise  asyncio.TimeoutError(f"msg_id:{msg_id}  request timeout" )     finally :                  future_dict.pop(msg_id, None )
 
可以发现在这种情况会忽略几个问题:
1.客户端已经丢弃请求了, 但服务端可能还在处理数据, 并在完成的时候返回数据, 但是这时候服务端返回的数据会被客户端丢弃的。  
2.如果客户端带有重试机制, 那么客户端会继续发送请求,此时服务端除了会同时多处理多个问题外, 在接口没做幂等性处理时还会出现脏数据的问题。 
3.如果客户端带有重试机制, 客户端会发送多个请求到服务端,服务端的压力会变大, 导致整个服务链路异常进而造成服务雪崩。  
 
可以发现, 客户端在这种情况下发生超时时, 除了自己抛异常外, 还要通知服务端进行超时处理, 而不是让服务端继续处理下去, 从而影响服务端性能, 但是通知服务端超时的请求也可能发生异常导致通知失败, 也会占用部分网络资源, 所以最好的方法是把截止时间跟随请求传到服务端(如果是一个超时参数, 则传递参数后, 服务端无法知道真实所剩的超时时间), 服务端通过截止时间来判断何时结束这个调用, 这种方法很简单, 在rap中就是这样处理的, 首先是客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 async  def  _base_request (self, request: Request, conn: Connection ) -> Response:          if  not  request.correlation_id:         request.correlation_id = str (async_get_snowflake_id())     resp_future_id: str  = f"{conn.sock_tuple} :{request.correlation_id} "      try :                  response_future: asyncio.Future[Response] = asyncio.Future()         self._resp_future_dict[resp_future_id] = response_future                  deadline: Optional[Deadline] = deadline_context.get()         if  self.app.through_deadline and  deadline:                          request.header["X-rap-deadline" ] = deadline.end_timestamp                  await  self.write_to_conn(request, conn)                  response: Response = await  as_first_completed(             [response_future],             not_cancel_future_list=[conn.conn_future],         )         response.state = request.state         return  response     finally :                  pop_future: Optional[asyncio.Future] = self._resp_future_dict.pop(resp_future_id, None )         if  pop_future:             safe_del_future(pop_future)
 
然后是服务端, 服务端也非常简单, 直接从Header获取值, 再应用到超时函数即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 try :          deadline_timestamp: int  = request.header.get("X-rap-deadline" , 0 )     if  deadline_timestamp:         timeout: int  = int (time.time() - deadline_timestamp)     else :                  timeout = self._run_timeout          result: Any = await  asyncio.wait_for(coroutine, timeout)except  asyncio.TimeoutError:          return  call_id, RpcRunTimeError(f"Call {func_model.func.__name__}  timeout" )except  Exception as  e:     return  call_id, e
 
这样即可完成跨进程跨服务的超时传递, 不过这种实现是有一个前提, 就是所有机器的时间得保持一致的(或者说只差一点点), 而微服务基本上都满足这种前提, 他们基本上都在同一个内网里面, 或者同个公司管理的跨区域机器里面。