前言 就目前而言,在大多数场景中,用户侧的客户端和服务端还是通过HTTP进行交互,然后服务端内部各种服务再通过过各种协议进行交互,所以在采用了gRPC做内部服务的交互协议后,就需要一个网关来把gRPC的调用自动映射到HTTP应用上,方便客户端调用。
注:本文只针对一对一的gRPC服务。
之前原本想当做一个简单的实现,后面发现可以与我编写的pait 框架可以嵌到一起,这样也可以对多个Web框架提供Gateway功能,于是就把这个功能的实现嵌入到pait 框架中,所以下面有些代码来自于我编写的pait 框架,对应的gRPC网关文档使用说明见:Pait gRPC Gateway
1.网关的基础实现思路 网关的思路很简单,就是把HTTP请求转发到了对应的gRPC服务,在Go
生态中,已经出现了一个类似的框架–grpc-gateway ,它负责grpc服务自动的映射到HTTP服务中,同时它通过让HTTP服务和gRPC服务监听同一个端口来降低使用者的疑惑。
通过前面的文章我们了解到,gRPC采用的是HTTP/2做传输协议,而HTTP/2是对于HTTP/1.1的一个升级,这样的话服务端在接收socket的数据且转为HTTP数据后就可以判断Header中Upgrade
字段对应的值来把数据分别分发到HTTP服务和gRPC服务。
但是,Python
的gRPC是通过gRPC-C转译的,也就是它的底层是C实现的,所以Python
的gRPC无法传socket来启动服务,而是只能通过指定ip:port的形式来启动服务,如下:
1 2 3 4 5 6 import grpcfrom concurrent import futures grpc.server( futures.ThreadPoolExecutor(max_workers=10 ), ).add_insecure_port("127.0.0.1:9000" )
这样上述的做法就无法实现了,只能让HTTP服务和gRPC服务分别监听不同的端口,无法做到像Go grpc-gateway 那样,毕竟Go才是亲儿子。
最终只能以下面的方式实现: 图中客户端发起的请求会先发送到HTTP服务绑定的端口,HTTP服务收到请求后,会转发到gRPC客户端,交由gRPC客户端发送到gRPC服务端,这样一系列服务就串起来了。
图中的示例HTTP服务与gRPC客户端在同一个进程,而gRPC服务在同台机器的另外一个进程,但是他们也可以在同一个进程内。
2.如何编写转发路由 总体思路决定后,就开始细究路由的实现了,路由只负责转发,不做其它功能,通过前面文章Python-gRPC实践(1)–gRPC简介 的捉包可以知道,gRPC传输数据时,采用的是HTTP/2来传输请求,用的是POST方法,同时使用Body传输数据,那么我们可以采用尽量相似的方法来接收请求,再通过gRPC客户端传到对应的服务端,如下的Proto文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 syntax = "proto3"; package example; message LoginUserRequest { string uid = 1; string password = 2; } message LoginUserResult { string uid = 1; string user_name = 2; string token = 3; } service User { rpc login_user(LoginUserRequest) returns (LoginUserResult); }
接着,以这种方法来创建一个路由来映射这个Proto文件的请求,这个路由采用POST方法然后解析对应的数据并通过对应的gRPC客户端传递到服务端,为了尽量的跟gRPC一致,所以HTTP服务与客户端通过Json进行交互,映射的路由代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import grpcfrom starlette.applications import Starlettefrom starlette.routing import Routefrom starlette.requests import Requestfrom starlette.responses import JSONResponsefrom example.example_grpc.python_example_proto_code.example_proto.user import user_pb2from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2_grpcasync def login_route (request: Request ) -> JSONResponse: request_dict: dict = await request.json() user_service: user_pb2_grpc.UserStub = request.app.state.user_service result: user_pb2.LoginUserResult = user_service.login_user( user_pb2.LoginUserRequest(uid=request_dict["uid" ], password=request_dict["password" ]) ) return JSONResponse( { "uid" : result.uid, "user_name" : result.user_name, "token" : result.token, } )def create_app () -> Starlette: app: Starlette = Starlette(routes=[Route("/api/login" , login_route, methods=["POST" ])]) def _before_server_start () -> None : app.state.user_service = user_pb2_grpc.UserStub(grpc.aio.insecure_channel("0.0.0.0:9000" )) app.add_event_handler("startup" , _before_server_start) return appif __name__ == "__main__" : import uvicorn uvicorn.run(create_app(), log_level="debug" )
这样一个简单的转发就编写完毕了,但是目前是手动编写的代码,意味着每新增一个接口,我们就需有编写一个对应的路由函数,同时如果Protobuf文件发生了变更,那么我们需要定位到对应的代码再修改,非常麻烦,特别是我们在定义Protobuf文件时,已经很像在定义一个接口了,结果还要再编写路由函数,效率非常低,所以一个网关最关键的是需要通过Protobuf文件或者通过Protobuf的产物来自动生成路由函数并映射到对应的服务,这样一来,其它使用者后续只要修改Protobuf文件即可。
3.提取路由需要的数据 为了自动生成路由,还需要一些数据,通过编写的转发路由可以发现,我们只需要收到用户的请求,解析数据,然后通过对应的gRPC客户端方法来发送数据即可,所以需要通过这个gRPC客户端的方法来找到所有需要的数据(尝试了多种方法,目前这种是最优的)。
以上面的Protobuf文件自动生成的Python
代码中的user_pb2_grpc.UserStub
为例子(可以通过pait/example/example_grpc 了解),它的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import grpcfrom example.example_grpc.python_example_proto_code.example_proto.user import user_pb2 as example__proto_dot_user_dot_user__pb2from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2class UserStub (object ): def __init__ (self, channel ): self.get_uid_by_token = channel.unary_unary( '/user.User/get_uid_by_token' , request_serializer=example__proto_dot_user_dot_user__pb2.GetUidByTokenRequest.SerializeToString, response_deserializer=example__proto_dot_user_dot_user__pb2.GetUidByTokenResult.FromString, ) self.logout_user = channel.unary_unary( '/user.User/logout_user' , request_serializer=example__proto_dot_user_dot_user__pb2.LogoutUserRequest.SerializeToString, response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, ) self.login_user = channel.unary_unary( '/user.User/login_user' , request_serializer=example__proto_dot_user_dot_user__pb2.LoginUserRequest.SerializeToString, response_deserializer=example__proto_dot_user_dot_user__pb2.LoginUserResult.FromString, ) self.create_user = channel.unary_unary( '/user.User/create_user' , request_serializer=example__proto_dot_user_dot_user__pb2.CreateUserRequest.SerializeToString, response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, ) self.delete_user = channel.unary_unary( '/user.User/delete_user' , request_serializer=example__proto_dot_user_dot_user__pb2.DeleteUserRequest.SerializeToString, response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, )
通过这份生成的代码可以发现,这个对象类中包括了每个调用的url,请求体和响应体,这样再加上上面说了gRPC是以POST方法进行请求的,那么构成一个最小路由的条件都已经满足了。 但是,要通过这份代码提取数据却比较麻烦,由于都是在__init__
方法后才初始化各个gRPC的调用方法,所以没办法通过dir(UserStub)
获取到对应的gRPC调用方法,只能通过UserStub(channel).__dict__
来获取gRPC的调用方法,不过channel
最好需要在对应的HTTP服务启动时再初始化,特别是对于uvicorn来说,如果grpc.aio.channel提前初始化,那么他们使用的不是同一个事件循环,这意味着就必须在Web实例启动后再注册路由,但由于Sanic
等框架是不支持实例启动后再注册路由的,所以这个方法行不通。此外,示例代码中的请求体,响应体都是调用对应的方法再加载到channel.unary_unary
中,之后我们没办法在运行时通过UserStub
的实例来直接获取。
幸运的是,由Proto文件生成的UserStub
对象是有规则的,所以可以在程序运行时获取到UserStub
的源码,然后通过一定的规则把数据提取出来,分析上面的源码,可以发现有如下几个规则:
1.self.xxx
中是gRPC调用的方法名,同时channel.unary_unary
代表了这是一对一的请求。
2.self.xxx
的下面第一行是gRPC方法对应的url
3.self.xxx
的下面第二行出现了gRPC方法的请求对象,这一行是以request_serializer=
开头,以.SerializeToString
结尾,可以通过正则提取中间的对象字符串,然后可以在运行时通过解析字符串从UserStub
所在的模块中提取到正确的请求对象。
4.self.xxx
的下面第三行则是gRPC方法的响应对象,处理方法跟请求对象类似。
发现了规则后就可以通过规则提取对应的数据,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import inspectimport refrom typing import Any, List, Generator, Optional, Typefrom types import ModuleTypefrom google.protobuf.message import Messagefrom example.example_grpc.python_example_proto_code.example_proto.user import user_pb2_grpcdef _gen_message (class_module: ModuleType, line: str , match_str: str ) -> Type[Message]: module_path_find_list = re.findall(match_str, line) if len (module_path_find_list) != 1 : raise ValueError("module path not found" ) module_path: str = module_path_find_list[0 ] module_path_list: List[str ] = module_path.split("." ) message_module: ModuleType = getattr (class_module, module_path_list[0 ]) message_model: Type[Message] = getattr (message_module, module_path_list[1 ]) if not issubclass (message_model, Message): raise RuntimeError("Can not found message" ) return message_modeldef parse (stub: Any ) -> Generator[tuple, None , None ]: line_list: List[str ] = inspect.getsource(stub).split("\n" ) class_module: Optional[ModuleType] = inspect.getmodule(stub) if not class_module: raise RuntimeError(f"Can not found {stub} module" ) for index, line in enumerate (line_list): if "self." not in line: continue if "channel.unary_unary" not in line: continue invoke_name: str = line.split("=" )[0 ].replace("self." , "" ).strip() method: str = line_list[index + 1 ].strip()[1 :-2 ] request: Type[Message] = _gen_message( class_module, line_list[index + 2 ], r"request_serializer=(.+).SerializeToString" ) response: Type[Message] = _gen_message( class_module, line_list[index + 3 ], r"response_deserializer=(.+).FromString" ) yield invoke_name, method, request, responseif __name__ == "__main__" : for i in parse(user_pb2_grpc.UserStub): print(i)
这段代码执行后就会输出以下结果,可以发现,这段代码正常的从UserStub
中提取到了我们想要的数据。
1 2 3 4 5 ('get_uid_by_token' , '/user.User/get_uid_by_token' , <class 'example_proto.user.user_pb2.GetUidByTokenRequest' >, <class 'example_proto.user.user_pb2.GetUidByTokenResult' >) ('logout_user' , '/user.User/logout_user' , <class 'example_proto.user.user_pb2.LogoutUserRequest' >, <class 'google.protobuf.empty_pb2.Empty' >) ('login_user' , '/user.User/login_user' , <class 'example_proto.user.user_pb2.LoginUserRequest' >, <class 'example_proto.user.user_pb2.LoginUserResult' >) ('create_user' , '/user.User/create_user' , <class 'example_proto.user.user_pb2.CreateUserRequest' >, <class 'google.protobuf.empty_pb2.Empty' >) ('delete_user' , '/user.User/delete_user' , <class 'example_proto.user.user_pb2.DeleteUserRequest' >, <class 'google.protobuf.empty_pb2.Empty' >)
不过这还不够,比如使用者可能想自定义这个请求映射的URL,或者是控制这个gRPC请求方法是否要映射等等,这些功能是无法通过Proto文件来控制的,如果写在另一个文件配置又非常麻烦,最后决定通过Proto文件的注释来实现这些功能,比如下面的Proto代码:
1 2 3 4 5 6 7 8 service User { // The interface should not be exposed for external use // pait: {"enable": false} rpc get_uid_by_token (GetUidByTokenRequest) returns (GetUidByTokenResult); // pait: {"tag": [["grpc-user", "grpc_user_service"], ["grpc-user-system", "grpc_user_service"]]} // pait: {"summary": "Create users through the system", "url": "/user/create"} rpc create_user(CreateUserRequest) returns (google.protobuf.Empty); }
这段代码有一个标准的pait:
开头,这样在解析时,就可以知道这一行是需要解析的文本,然后就会去解析后面跟着的一串Json字符串,就可以知道get_uid_by_token
设置了enable
为false
,而create_user
设置了url为/user/create
。不过,默认生成的代码中,客户端对象是没有携带Proto文件的注释信息的,只有他们对应的服务端对象才有携带,比如UserStub
没有携带注释信息,而UserStub
对应的User
对象则携带,所以需要从User
中提取数据。
此外,对于请求对象和响应对象也可以采用这样的思路,但是Python
的gRPC库通过Proto文件生成Python
代码时,Proto文件中为Message编写的注释是不会一起跟过来的,需要自己编写一个插件,或者通过mypy-proto
插件生成的文件来获取。由于这部分的代码比较长且繁杂,这里只做简单说明,具体源码见pait.util.groc_inspect.stub.py
4.自动映射路由 至此,url,请求方法,请求体和响应体都获取到了,现在可以自动映射路由了,还是以starlette
框架为例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import grpcfrom starlette.applications import Starlettefrom starlette.requests import Requestfrom starlette.responses import JSONResponsefrom google.protobuf.message import Messagefrom google.protobuf.json_format import MessageToDictfrom typing import Any, Callablefrom example.example_grpc.python_example_proto_code.example_proto.user import user_pb2from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2_grpcdef gen_route ( app: Starlette, stub: Any, url: str , invoke_name: str , grpc_request: Message, grpc_response: Message, ) -> None : """ :param app: Starlette实例 :param stub: gRPC客户端对应的类 :param url: url :param invoke_name: gRPC调用的方法名 :param grpc_request: gRPC请求体 :param grpc_response: gRPC响应体,可以发现它没有被使用 """ def _route (request: Request ) -> JSONResponse: resp_dict: dict = await request.json() func: Callable = getattr (stub(app.state.channel), invoke_name) return JSONResponse(MessageToDict(await func(grpc_request(**resp_dict)))) app.add_route(url, _route, methods=["POST" ])def create_app () -> Starlette: app: Starlette = Starlette() for parse_result_list in parse(user_pb2_grpc.UserStub): gen_route( app, user_pb2_grpc.UserStub, *parse_result_list, ) def _before_server_start () -> None : app.state.channel = grpc.aio.insecure_channel("0.0.0.0:9000" ) app.add_event_handler("startup" , _before_server_start) return appif __name__ == "__main__" : import uvicorn uvicorn.run(create_app(), log_level="debug" )
5.总结 至此,gRPC网关的最核心的实现已经编写完成了,不过可以发现gRPC的响应体并没有被使用,同时这个实现非常简单,不容易拓展,可以通过自定义Gateway Route路由函数 了解更多的拓展。