前言 之前在文章《Python-gRPC实践(3)–使用Python实现gRPC服务》 介绍的实现gRPC
服务中使用了一套自定义的协议来传递错误,但这并不是一个优雅的解决方案,因为这种方案的兼容性很差,好在官方定义了一种解决方案,通过这种方案可以使不同的服务都能传递错误。
1.自定义的错误传递 在编写普通的HTTP/1.1接口时,我们都会定制一套业务相关的错误来与HTTP标注的错误区分开,比如通常都会返回这样一个结构体:
1 2 3 4 5 { "code" : "0" , "msg" : "success" , "data" : {} }
这个结构体包含了code
,msg
和data
三个字段,他们分别是错误码,错误信息,和要返回的结构。 客户端在收到响应后,会判断code
的值是什么,如果属于定义的成功状态码则通过data
提取数据,否则把msg
信息通过异常抛出来。
在使用gRPC
中更不例外,因为我们在使用gRPC
调用时,就像调用一个普通函数一样,不过gRPC
服务间是通过传递message数据来进行交互的,每个调用的请求message和响应message都已经被固定了,如果我们想返回一个错误信息,那么必定会跟响应结构体不一样,所以错误的信息的结构体一定要跟响应体匹配,否则只能另寻它路,比如在每个响应体嵌入错误信息的字段,如下:
1 2 3 4 5 6 7 message Demo { string a=1 ; int32 b=2 ; int32 err_code=3 ; string err_msg=4 ; }
然后服务端判断调用执行出错就把错误转换为对应的err_code
和err_msg
再塞入到message中传给客户端,而客户端每收到调用响应就判断err_code
是否有值,有则代表是异常请求,只把err_code
和err_msg
提取出来生成一个异常并抛给调用者,否则就正常返回数据。
采用这种方法可以兼容每一种调用,但是并不是十分的优雅,如果能通过别的协议容器把数据传给客户端,客户端通过对应的协议解析到错误信息并生产异常就好,在之前介绍的gRPC
服务中,就是采用gRPC.metadata
来传输数据。同时为了能自动处理服务端异常的捕获和客户端的异常生成,会分别在客户端和服务端设置一个顶层的拦截器,服务端的顶层拦截器代码如下(因为其它的拦截器可能会抛错,所以捕获错误的拦截器一定要放置在最顶层):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import loggingimport timefrom typing import Any, Callable, List, Tupleimport grpcfrom grpc_example_common.helper.context import context_proxyfrom .base import BaseInterceptorclass CustomerTopInterceptor (BaseInterceptor ): def intercept ( self, next_handler_method: Callable, request_proto_message: Any, context: grpc.ServicerContext, ) -> Any: return_initial_metadata: List[Tuple] = [("customer-user-agent" , "Python3" )] try : return next_handler_method(request_proto_message, context) except Exception as e: if self.metadata_dict.get("customer-user-agent" , "" ) == "Python3" : return_initial_metadata.append(("exc_name" , e.__class__.__name__)) return_initial_metadata.append(("exc_info" , str (e))) raise e finally : context.send_initial_metadata(return_initial_metadata)
该拦截器会捕获调用的异常,然后把异常的方法名和异常信息存在metedata中,这里之所以把值设置到metadata中,而不通过context.set_code
,context.set_details
来设置错误码和错误信息是有原因的。
首先是code
,gRPC
限制了只能设置它允许的code
,所以这会限制我们去自定义code
,同时我们也不应该把业务的错误码设置到响应的错误码中,所以不在这里使用context.set_code
;而对于set_details
,则是因为gRPC
服务端在捕获到异常后会解析对应的异常,然后把异常数据通过context.set_details
设置到details
中,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def _call_behavior (rpc_event, state, behavior, argument, request_deserializer, send_response_callback=None ): from grpc import _create_servicer_context with _create_servicer_context(rpc_event, state, request_deserializer) as context: try : response_or_iterator = None if send_response_callback is not None : response_or_iterator = behavior(argument, context, send_response_callback) else : response_or_iterator = behavior(argument, context) return response_or_iterator, True except Exception as exception: with state.condition: if state.aborted: _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, b'RPC Aborted' ) elif exception not in state.rpc_errors: details = 'Exception calling application: {}' .format ( exception) _LOGGER.exception(details) _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, _common.encode(details)) return None , False
这就意味着我们即使在拦截器设置了details
,但是由于抛出来的异常并不属于gRPC
的异常,所以details
最终被异常信息覆盖了。
了解完了服务端的拦截器实现,接下来看看客户端的拦截器实现,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import inspectimport loggingfrom typing import Any, Callable, Dict, List, Optional, Typefrom .base import GRPC_RESPONSE, BaseInterceptor, ClientCallDetailsTypeclass CustomerTopInterceptor (BaseInterceptor ): def __init__ (self, exc_list: Optional[List[Type[Exception]]] = None ): self.exc_dict: Dict[str , Type[Exception]] = {} for key, exc in globals ()["__builtins__" ].items(): if inspect.isclass(exc) and issubclass (exc, Exception): self.exc_dict[key] = exc if exc_list: for exc in exc_list: if issubclass (exc, Exception): self.exc_dict[exc.__name__] = exc def intercept ( self, method: Callable, request_or_iterator: Any, call_details: ClientCallDetailsType, ) -> GRPC_RESPONSE: if call_details.metadata is not None : call_details.metadata.append(("customer-user-agent" , "Python3" )) response: GRPC_RESPONSE = method(call_details, request_or_iterator) metadata_dict: dict = {item.key: item.value for item in response.initial_metadata()} if metadata_dict.get("customer-user-agent" ) == "Python3" : exc_name: str = metadata_dict.get("exc_name" , "" ) exc_info: str = metadata_dict.get("exc_info" , "" ) exc: Optional[Type[Exception]] = self.exc_dict.get(exc_name) if exc: raise exc(exc_info) return response
可以看出客户端拦截器通过获取服务端返回的metada来判断是否有异常信息,如果有就提取出并抛出错误,否则就正常返回响应。这样一来只要客户端服务端都设置了正确的拦截器,客户端就能获得到服务端的错误信息并抛出异常,不过这种实现方式是依赖gRPC.metadata
传输数据的,而gRPC.metadata
的值必须是ASCII或者规范的字节,不然就不给传输甚至还会卡住请求,这就意味着我们需要对错误信息进行一些序列化。
2.基于官方协定的错误传输实现 由于上面的实现不是很优雅,于是就上网冲浪寻找一个官方的实现,后面终于在Github
中找到了官方的错误传输示例 ,其中官方的服务端示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def create_greet_limit_exceed_error_status (name ): detail = any_pb2.Any() detail.Pack( error_details_pb2.QuotaFailure(violations=[ error_details_pb2.QuotaFailure.Violation( subject="name: %s" % name, description="Limit one greeting per person" , ) ],)) return status_pb2.Status( code=code_pb2.RESOURCE_EXHAUSTED, message='Request limit exceeded.' , details=[detail], )class LimitedGreeter (helloworld_pb2_grpc.GreeterServicer ): def __init__ (self ): self._lock = threading.RLock() self._greeted = set () def SayHello (self, request, context ): with self._lock: if request.name in self._greeted: rich_status = create_greet_limit_exceed_error_status( request.name) context.abort_with_status(rpc_status.to_status(rich_status)) else : self._greeted.add(request.name) return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
该示例代码中的SayHello
方法逻辑非常简单,它判断如果name
不存在,就把name
添加到集合中,并正常返回,如果已经存在,则先生成一个Status
对象,再通过to_status
方法生成 一个_Status
对象, 最后通过abort_with_stauts
方法把_Status
对象传进去,这样就把错误数据传输到了客户端。
其中abort_with_stauts
方法会使请求引发异常并以非正常状态终止,再把用户指定的Status
对象传给客户端,而to_status
的源码如下:
1 2 3 4 5 def to_status (status ): return _Status(code=code_to_grpc_status_code(status.code), details=status.message, trailing_metadata=((GRPC_DETAILS_METADATA_KEY, status.SerializeToString()),))
通过源码可以看出这个函数就是把status.code
转为gRPC
响应的code
,把status.message
转为gRPC
的details
,最后把status
转为合法的字符串,并通过GRPC_DETAILS_METADATA_KEY
把字符串设置到metadata中。
而对于客户端则比较简单,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def process (stub ): try : response = stub.SayHello(helloworld_pb2.HelloRequest(name='Alice' )) _LOGGER.info('Call success: %s' , response.message) except grpc.RpcError as rpc_error: _LOGGER.error('Call failure: %s' , rpc_error) status = rpc_status.from_call(rpc_error) for detail in status.details: if detail.Is(error_details_pb2.QuotaFailure.DESCRIPTOR): info = error_details_pb2.QuotaFailure() detail.Unpack(info) _LOGGER.error('Quota failure: %s' , info) else : raise RuntimeError('Unexpected failure: %s' % detail)
这段代码中,如果是正常响应,则打印响应体,而如果是异常,客户端会发现响应体的code
并不是正常的状态码,所以会抛出一个grpc.RpcError
异常,然后通过rpc_status.from_call
函数提取异常, 这个函数的逻辑非常简单,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def from_call (call ): if call.trailing_metadata() is None : return None for key, value in call.trailing_metadata(): if key == GRPC_DETAILS_METADATA_KEY: rich_status = status_pb2.Status.FromString(value) if call.code().value[0 ] != rich_status.code: raise ValueError( 'Code in Status proto (%s) doesn\'t match status code (%s)' % (code_to_grpc_status_code(rich_status.code), call.code())) if call.details() != rich_status.message: raise ValueError( 'Message in Status proto (%s) doesn\'t match status details (%s)' % (rich_status.message, call.details())) return rich_status return None
通过源码看出这个逻辑和自定义的错误传递一样,也是通过metadata
提取数据然后拼成一个异常对象。不过,需要注意的是from_call
的call
参数不仅支持grpc.RpcError
,它还支持客户端拦截器中得到的response
对象,因为call
参数在form_call
中用到了trailing_metadata
,code
和details
方法都是grpc.RpcError
和response
对象共同拥有的方法。
在简单的了解了gRPC
的错误传递示例后可以发现,官方的方法与自定义的错误传递很类似,只不过它定义了一个规范的Key,这样一来大家都会认为这个Key对应的值是一个Status
对象的序列化成的字符串(由于序列化了,就不用担心存在非ASCII字符的问题)。而这个Status
对象中包含了code
,message
和detail
三个字段,分别对应着上面所说的错误结构体:
1 2 3 4 5 { "code" : "0" , "msg" : "success" , "data" : {} }
中的code
,msg
和data
,不过需要注意的是detail
是一个数组,它可以存放多个自定义的Message
对象。
3.重新设计错误传递实现 通过官方的错误传输实现可以发现,这个例子需要服务端的业务逻辑主动通过context.abort_with_status
逻辑来主动把错误信息设置到metadata
中,同时也需要客户端捕获grpc.RpcError
异常再打印出来,这样对业务层来说是非常啰嗦的,于是就尝试把官方协定的错误传输实现与自定义的错误传递结合起来。
首先是定义一个内部统一的message
:
1 2 3 4 message Exec { string name = 1 ; string msg = 2 ; }
这个Message只用于内部业务服务,如果该服务端有开发给其它部门使用,且他们没有兼容这个message
,他们也可以通过code
和detail
知道大概是什么样的错误。
然后就开始折腾服务端的顶层拦截器,这个拦截器只要改造捕获异常部分的代码即可,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class CustomerTopInterceptor (BaseInterceptor ): def intercept ( self, next_handler_method: Callable, request_proto_message: Any, context: grpc.ServicerContext, ) -> Any: try : return next_handler_method(request_proto_message, context) except Exception as e: detail = any_pb2.Any() detail.Pack( Exec( name=e.__class__.__name__, msg=str (e) ) ) context.abort_with_status( rpc_status.to_status( status_pb2.Status( code=code_pb2.RESOURCE_EXHAUSTED, message=str (e), details=[detail], ) ) ) raise e
接着就折腾客户端的顶层拦截器,同样的它只需要改一下数据的获取就可以了,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class CustomerTopInterceptor (BaseInterceptor ): ... def intercept ( self, method: Callable, request_or_iterator: Any, call_details: ClientCallDetailsType, ) -> GRPC_RESPONSE: response: GRPC_RESPONSE = method(call_details, request_or_iterator) status: Optional[status_pb2.Status] = rpc_status.from_call(response) if status: for detail in status.details: if detail.Is(Exec.DESCRIPTOR): exec_instance: Exec = Exec() detail.Unpack(exec_instance) exec_class: Type[Exception] = self.exc_dict.get(exec_instance.name) or RuntimeError raise exec_class(exec_instance.msg) else : raise RuntimeError('Unexpected failure: %s' % detail) return response
这样一来,新的错误传递实现已经完成了,现在通过一个简单的demo来验证成果,demo代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from concurrent import futuresfrom typing import Listimport grpcfrom grpc_example_common.interceptor.server_interceptor.base import BaseInterceptorfrom google.protobuf.empty_pb2 import Empty from grpc_example_common.protos.user import user_pb2 as user_messagefrom grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptorfrom grpc_example_common.protos.user import user_pb2_grpc as user_serviceclass UserService (user_service.UserServicer ): def delete_user (self, request: user_message.DeleteUserRequest, context: grpc.ServicerContext ) -> Empty: uid: str = request.uid if uid == "123" : return Empty() else : raise ValueError(f"Not found user:{uid} " )def main (host: str = "127.0.0.1" , port: str = "9000" ) -> None : interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()] server: grpc.server = grpc.server( futures.ThreadPoolExecutor(max_workers=10 ), interceptors=interceptor_list, ) user_service.add_UserServicer_to_server(UserService(), server) server.add_insecure_port(f"{host} :{port} " ) server.start() try : server.wait_for_termination() except KeyboardInterrupt: server.stop(0 )if __name__ == "__main__" : main()import grpcfrom grpc_example_common.protos.user import user_pb2 as user_messagefrom grpc_example_common.protos.user import user_pb2_grpc as user_servicefrom grpc_example_common.interceptor.client_interceptor.customer_top import CustomerTopInterceptor channel: grpc.Channel = grpc.intercept_channel( grpc.insecure_channel("127.0.0.1:9000" ), CustomerTopInterceptor() ) user_stub: user_service.UserStub = user_service.UserStub(channel) user_stub.delete_user(user_message.DeleteUserRequest(uid="123" )) user_stub.delete_user(user_message.DeleteUserRequest(uid="456" ))
编写完demo后开始运行,运行后客户端抛出如下错误信息:
1 2 3 4 5 6 7 8 9 10 11 12 Traceback (most recent call last): File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/demo.py" , line 11, in <module> user_stub.delete_user(user_message.DeleteUserRequest(uid="456" )) File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc/_interceptor.py" , line 216, in __call__ response, ignored_call = self._with_call(request, File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc/_interceptor.py" , line 254, in _with_call call = self._interceptor.intercept_unary_unary(continuation, File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc_example_common/interceptor/client_interceptor/base.py" , line 74, in intercept_unary_unary return self.intercept(continuation, request, call_details) File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc_example_common/interceptor/client_interceptor/customer_top.py" , line 44, in intercept raise exec_class(exec_instance.msg) ValueError: Not found user:456
通过信息可以发现,重新设计的错误传递实现完美运行。