前言 世界上没有百分之百不会挂的服务,只能人为的去增加服务的可用性,为了能让服务的可用性增加,需要为服务添加服务治理的功能,而在gRPC中,可以通过拦截器实现一些服务治理的功能。
1.什么是拦截器 在Python
中,很少有框架把自己的功能称为拦截器,反而都称为中间件或者是钩子,gRPC的拦截器功能与Python
Web框架常用的中间件基本一样,它主要的功能是可以在调用函数之前,之后已经发生异常时能够捕获到对应的数据,比如如下代码:
1 2 3 4 5 6 7 8 def demo_rpc_method (): try : pass except Exception: pass finally :
这段代码提现了拦截器的主要特点:
1.裹住了一个RPC调用逻辑
2.能够在裹住的逻辑调用之前,之后以及调用异常的时候自定义数据
对于Python
开发人员来说,看到这思路后脑子里的第一个想法就是可以通过Python
的装饰器轻松的实现gRPC
拦截器的逻辑。
所以在官方尚未敲定拦截器实现的时候,大家也都是使用装饰器来实现拦截器的,比如下面这段代码(源码见grpc_example_common/helper/grpc_wrapper.py ):
1 2 3 4 5 6 7 8 9 10 11 12 13 def grpc_client_func_wrapper (*args: Any, **kwargs: Any ) -> Callable: def wrapper (func: Callable ) -> Callable: @wraps(func ) def _wrapper (*_args: Any, **_kwargs: Any ) -> Any: if "metadata" not in _kwargs: _kwargs["metadata" ] = [] return func(*_args, **_kwargs) return _wrapper return wrapper
这是我目前还在用的一个通过装饰器实现的拦截器,因为目前gRPC的客户端在调用方法没定义metadata
参数时,gRPC
客户端拦截器是没办法去更改metadata
参数的,强行修改还会报错,而meatdata
参数类似于HTTP的Header参数,很多通过拦截器实现的功能如链路追踪,身份校验等功能的实现都需要通过拦截器更改或写入metadata
参数来实现,所以这个装饰器一直保留着。
既然有了装饰器,那为什么gRPC最终还是选择拦截器这个方案呢,原因是装饰器虽然很灵活,但是本身有很多缺点:
0.装饰器只能用于方法上,如果遇到一个无法调用到指定的方法的请求,那么这个请求就无法没监控到。
1.需要显式的为每个方法添加装饰器。
2.过多的装饰器会变得非常混乱。
3.上篇文章提到的由于装饰器导致需要延后声明才能使方法mock生效。
Note: 可以通过grpc_example_common/helper/grpc_wrapper.py 函数自动应用装饰器,使用方法见grpc_service/book_service.py
2.如何使用拦截器 由于装饰器自己带有一些缺点,所以gRPC
最后自己定义了一个拦截器方案,不过Python
实现gRPC拦截器API非常复杂,按照官方文档的指引,我们需要为每种请求类型定义一个拦截器,但是他们的代码却是相同的,这对开发人员来说非常的不友好,不过好在有一个grpc-interceptor 包进行了统一,使用起来非常方便,只用继承他提供的类,再覆盖intercept方法就可以同时为一对一,多对一,一对多的请求套上拦截器,比如我定义了一个可以把错误类型在不同基于Python
实现的gRPC服务传播的拦截器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 class ServerCustomerTopInterceptor (BaseInterceptor ): def intercept ( self, next_handler_method: Callable, request_proto_message: Any, context: grpc.ServicerContext, ) -> Any: start_time: float = time.time() return_initial_metadata: List[Tuple] = [("customer-user-agent" , "Python3" )] try : return next_handler_method(request_proto_message, context) except Exception as e: if self.metadata_dict.get("customer-user-agent" , "" ) == "Python3" : return_initial_metadata.append(("exc_name" , e.__class__.__name__)) return_initial_metadata.append(("exc_info" , str (e))) logging.exception(f"{context_proxy.method} request exc:{e.__class__.__name__} error:{e} " ) raise e finally : context.send_initial_metadata(return_initial_metadata) logging.info( f"Got Request. method:{self.method} , code:{context.code()} , detail:{context.details()} , duration:{time.time() - start_time} " )class ClientCustomerTopInterceptor (BaseInterceptor ): def __init__ (self, exc_list: Optional[List[Type[Exception]]] = None ): self.exc_dict: Dict[str , Type[Exception]] = {} for key, exc in globals ()["__builtins__" ].items(): if inspect.isclass(exc) and issubclass (exc, Exception): self.exc_dict[key] = exc if exc_list: for exc in exc_list: if issubclass (exc, Exception): self.exc_dict[exc.__name__] = exc def intercept ( self, method: Callable, request_or_iterator: Any, call_details: ClientCallDetailsType, ) -> GRPC_RESPONSE: if call_details.metadata is not None : call_details.metadata.append(("customer-user-agent" , "Python3" )) call_details.metadata.append(("request_id" , context_proxy.request_id)) response: GRPC_RESPONSE = method(call_details, request_or_iterator) metadata_dict: dict = {item.key: item.value for item in response.initial_metadata()} if metadata_dict.get("customer-user-agent" ) == "Python3" : exc_name: str = metadata_dict.get("exc_name" , "" ) exc_info: str = metadata_dict.get("exc_info" , "" ) exc: Optional[Type[Exception]] = self.exc_dict.get(exc_name) if exc: raise exc(exc_info) return response
可以看到源码中的拦截器都有调用之前,调用中,调用后三大逻辑,与我们前面说的一样,给予开发者极强的自定义能力,这样一来就可以通过拦截器来实现一些服务治理的功能,对于如何实现可见RPC框架编写实践系列
实现完拦截器后就需要应用到gRPC服务中,由于grpc-interceptor 是对官方拦截器的简单安装,所以可以像官方拦截器一样应用到服务中,服务端应用拦截器可参考grpc-example-book-grpc-service项目 在grpc.Server
初始化时通过interceptors
参数把拦截器列表传进去,简要代码如下:
1 2 3 4 5 6 7 8 9 10 def main ( host: str = "0.0.0.0" , port: str = "9000" , ssl_port: Optional[str ] = None ) -> None : interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()] server: grpc.server = grpc.server( futures.ThreadPoolExecutor(max_workers=10 ), interceptors=interceptor_list, ) manager_service.add_BookManagerServicer_to_server(ManagerService(), server) social_service.add_BookSocialServicer_to_server(SocialService(), server)
而对于客户端可以参考grpc-example-api-backend-service项目 在通过生成channel
时,把拦截器通过参数传进去
1 2 3 4 5 6 7 class BookGrpcService (BookSocialGrpcServiceMixin, BookManagerGrpcServiceMixin ): def __init__ (self, host: str , port: int ) -> None : self.channel: grpc.Channel = grpc.intercept_channel( grpc.insecure_channel(f"{host} :{port} " ), CustomerTopInterceptor() ) BookSocialGrpcServiceMixin.__init__(self, self.channel) BookManagerGrpcServiceMixin.__init__(self, self.channel)
拦截器编写完成后应该去测试它,但是大多数都拦截器都依赖于一些特定的服务,如Prometheus监控依赖Prometheus,链路追踪可能依赖到的jaeger等,所以很多时候都会在测试环境或者预发布环境进行测试,但是对于其它无服务依赖的拦截器则可以使用grpc_interceptor
包实现的测试模块来进行测试,如官方的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from grpc_interceptor import ExceptionToStatusInterceptorfrom grpc_interceptor.exceptions import NotFoundfrom grpc_interceptor.testing import dummy_client, DummyRequest, raisesdef test_exception (): special_cases = {"error" : raises(NotFound())} interceptors = [ExceptionToStatusInterceptor()] with dummy_client(special_cases=special_cases, interceptors=interceptors) as client: assert client.Execute(DummyRequest(input ="foo" )).output == "foo" with pytest.raises(grpc.RpcError) as e: client.Execute(DummyRequest(input ="error" )) assert e.value.code() == grpc.StatusCode.NOT_FOUND
通过该例子可以方便的实现一个正常请求和错误请求来测试拦截器的返回,但是对于一些内在逻辑最好通过pytest-mock或者是官方的unitest-mock来判断是否有调用到,调用几次,调用时的内容是否符合标准等等。