前记 gRPC
已经是一个大多数开发者使用微服务时选择的通信协议,大多数公司的内部服务都会通过gRPC
来通信,但是服务端和客户端使用的通信协议还是HTTP
,这就意味着需要在客户端和内部服务之间架起一个可以转换HTTP
与gRPC
协议的网关。pait 的gRPC Gateway
模块就是实现了这样的一个功能,gRPC Gateway
模块通过pait
能快速的把Python
Web框架和gRPC
连接起来,并自动处理和转发请求。
目前只支持Flask
, Starlite
, Sanic
, Tornado
1.一个简单的例子 在介绍如何使用Pait
快速构建gRPC Json
网关之前先以一个简单的示例项目为例子来介绍没用网关前的局限性。
例子项目代码见附录一链接
在这个例子中存在一个客户端和三个后端服务,其中gRPC
服务有两个,一个是负责用户的创建、注销、登录、登出,token校验五个功能的User
服务;另外一个是负责书本的信息获取和书本评论和点赞等功能的book
服务。而剩下的后端服务是使用Flask
框架构建的API服务,它负责暴露出HTTP接口供客户端调用,当被客户端调用时会把请求进行处理并通过gRPC
客户端转发给另外两个gRPC
服务,他们的关系图如下: 通过关系图可以发现客户端只与Flask
应用通过HTTP通信,Flask
应用通过gRPC
客户端与其它机器的gRPC
服务通过gRPC
进行通信,客户端无法直接访问到gRPC
服务所在的机器node2
和node3
。
这个设计简单又实用,挺不错的,但是在编写代码时却有点烦恼,以用户服务为例子,编写gRPC
服务的第一步是编写好Protobuf文件
,其中用户服务的Protobuf
文件描述的Service
如下:
1 2 3 4 5 6 7 8 service User { rpc get_uid_by_token (GetUidByTokenRequest) returns (GetUidByTokenResult) ; rpc logout_user (LogoutUserRequest) returns (google.protobuf.Empty) ; rpc login_user(LoginUserRequest) returns (LoginUserResult) ; rpc create_user(CreateUserRequest) returns (google.protobuf.Empty) ; rpc delete_user(DeleteUserRequest) returns (google.protobuf.Empty) ; }
第二步是根据Protobuf
文件生成的接口代码来编写对应的代码逻辑,然后把代码部署在node2
机器上运行。
接下来就是麻烦的第三步了,首先是根据Protobuf
文件生成客户端代码,然后编写调用gRPC
客户端的路由函数,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from flask import Response, requestfrom grpc_example_common.protos.user import user_pb2 as user_messagefrom app_service.utils import g, get_uid_by_token, make_responsedef create_user () -> Response: request_dict: dict = request.json g.user_grpc_service.create_user( uid=request_dict["uid" ], user_name=request_dict["user_name" ], password=request_dict["password" ] ) return make_response()def delete_user () -> Response: request_dict: dict = request.json g.user_grpc_service.delete_user(uid=request_dict["uid" ]) return make_response()def login_route () -> Response: request_dict: dict = request.json login_result: user_message.LoginUserResult = g.user_grpc_service.login_user( uid=request_dict["uid" ], password=request_dict["password" ] ) return make_response({"token" : login_result.token, "uid" : login_result.uid})def logout_route () -> Response: request_dict: dict = request.json if get_uid_by_token() == request_dict["uid" ]: token: str = request.headers.get("token" , "" ) g.user_grpc_service.logout_user(uid=request_dict["uid" ], token=token) return make_response() else : raise RuntimeError("Uid ERROR" )
可以看到示例代码中的几个路由函数都是重复的获取请求参数,再把参数逐一的传给gRPC
客户端,通过gRPC
客户端调用得到结果后对结果反序列化再返回给客户端。
当路由函数编写完成后就需要把路由函数注册到Flask
应用中,代码如下:
1 2 3 4 5 6 7 8 9 from flask.blueprints import Blueprintfrom app_service import user_route user_bp: Blueprint = Blueprint("user_bp" , __name__, url_prefix="/api/user" ) user_bp.add_url_rule("/create" , view_func=user_route.create_user, methods=["POST" ]) user_bp.add_url_rule("/delete" , view_func=user_route.delete_user, methods=["POST" ]) user_bp.add_url_rule("/login" , view_func=user_route.login_route, methods=["POST" ]) user_bp.add_url_rule("/logout" , view_func=user_route.logout_route, methods=["POST" ])
在把代码中的blueprint
注册到Flask
应用后,api
服务也编写完成了,接着就可以部署到node1
机器上并供客户端调用了。
可以看到这一切都非常简单,但是手动编写的重复代码比较多,通过示例代码可以看出路由函数名和url名都差别不大,每个路由代码逻辑也很像。 而且当想要修改gRPC
服务的调用名称,会发现除了修改Protobuf
文件外,api
服务的代码也要跟着手动修改,这太麻烦了,也容易出错。
同时可以发现在上述例子中编写的转发路由代码跟Protobuf
很像,这意味着也可以通过Protobuf
文件生成对应的路由代码,这也是pait 的实现思路,同时pait 参照了google.api.http 来补充Protobuf
缺少的HTTP信息,参照protoc-gen-validate 补充了请求体的信息,使Protobuf
文件能表示OpenAPI
的所有字段数据。
2.使用Pait构建gRPC Json网关 了解完后,现在开始以User
服务为例构建gRPC Json
网关,主要涉及到API服务和Protobuf文件的修改。
完整代码见附录二
2.1.修改Protobuf文件 使用Pait
构建gRPC Json网关的第一步是在gRPC
公有包中更改Protobuf
文件, gRPC
公有包项目结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 . ├── grpc_example_common │ ├── helper │ ├── interceptor │ │ ├── client_interceptor │ │ └── server_interceptor │ └── protos │ ├── book │ ├── common │ └── user └── protos └── grpc_example_common └── protos ├── book ├── common └── user # 存放用户相关的Protobuf 文件
更改Protobuf
文件的第一步是通过api.proto 和p2p_validate.proto 下载Protobuf文件到./protos/grpc_example_common/protos/common
目录中,其中api.proto
提供的是对gRPC
接口(也就是service.rpc)的描述,p2p_validate.proto
提供的是对Message
的描述,下载完成后./protos/grpc_example_common/protos/common
目录存放的Protobuf
文件有如下3个:
1 2 3 4 5 6 7 8 9 10 11 . ├── grpc_example_common └── protos └── grpc_example_common └── protos ├── book ├── common │ ├── api.proto │ ├── exce.proto │ └── p2p_validate.proto └── user
第二步是更改对应的Protobuf文件,以User
服务为例子,首先是引入api.proto
和p2p_validate.proto
:
1 2 import "grpc_example_common/protos/common/api.proto" ;import "grpc_example_common/protos/common/p2p_validate.proto" ;
如果是使用Pycharm
且出现如下提示: 那么可以通过点击Add import path to plugin settings
解决,如果还没办法解决而弹出一个项目文件结构的窗KPI,则点击窗口中proto
对应的文件即可解决。
在完成头文件的引入后,就可以修改Protobuf的其他代码了,首先是修改service
的代码,为service
中的每一个rpc
方法附上对应的OpenAPI信息,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 service User { rpc get_uid_by_token (GetUidByTokenRequest) returns (GetUidByTokenResult) { option (pait.api.http) = { not_enable: true , }; }; rpc logout_user (LogoutUserRequest) returns (google.protobuf.Empty) { option (pait.api.http) = { summary: "User exit from the system" , any: {url: "/user/logout" }, tag: [{name: "grpc-user" , desc: "grpc_user_service" }, {name: "user-action" , desc: "User Operating Interface" }], additional_bindings: { desc: "This interface performs a logical delete, not a physical delete" , summary: "Like delete_user" , delete: {url: "/user/logout" }, tag: [ {name: "grpc-user" , desc: "grpc_user_service" }, {name: "grpc-user-system" , desc: "grpc_user_service" } ] } }; }; }
接着再修改gRPC
函数对应的Message
,以CreateUserRequest
和LogoutUserRequest
为例子,修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 message CreateUserRequest { string uid = 1 [ (p2p_validate.rules).string .miss_default = true , (p2p_validate.rules).string .example = "10086" , (p2p_validate.rules).string .title = "UID" , (p2p_validate.rules).string .description = "user union id" ]; string user_name = 2 [ (p2p_validate.rules).string .description = "user name" , (p2p_validate.rules).string .min_length = 1 , (p2p_validate.rules).string .max_length = 10 , (p2p_validate.rules).string .example = "so1n" ]; string password = 3 [ (p2p_validate.rules).string .description = "user password" , (p2p_validate.rules).string .alias = "pw" , (p2p_validate.rules).string .min_length = 6 , (p2p_validate.rules).string .max_length = 18 , (p2p_validate.rules).string .example = "123456" , (p2p_validate.rules).string .pydantic_type = "SecretStr" ]; }message LogoutUserRequest { string uid = 1 [ (p2p_validate.rules).string .example = "10086" , (p2p_validate.rules).string .title = "UID" , (p2p_validate.rules).string .description = "user union id" ]; string token = 2 [ (p2p_validate.rules).string .description = "user token" , (p2p_validate.rules).string .enable = false ]; }
修改完成后记得通过Protobuf
文件生成对应的Python
代码并打包,再传到代码仓库中,具体流程见文章:Python-gRPC实践(3)–使用Python实现gRPC服务
2.2.修改Flask应用 Protobuf文件修改完后可以开始修改Flask
服务,Flask
应用的项目结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 ├── app .py ├── app_service │ ├── __init__ .py │ ├── manager_book_route .py │ ├── route .py │ ├── social_book_route .py │ ├── user_route .py │ └── utils .py ├── grpc_service │ ├── book_service .py │ ├── __init__ .py │ └── user_service .py └── gunicorn .conf .py
为了区分两种不同的调用,会在app_service
文件夹新建一个名为user_gateway_route.py
的文件,并编写如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from typing import Typeimport grpcfrom flask import Flask, jsonify, Responsefrom pydantic import BaseModel, Fieldfrom pait.app.flask.grpc_route import GrpcGatewayRoutefrom pait.app import set_app_attributefrom pait.model.response import PaitBaseResponseModel, PaitJsonResponseModelfrom pait.util.grpc_inspect.stub import GrpcModelfrom protobuf_to_pydantic import msg_to_pydantic_modelfrom grpc_example_common.protos.user import user_pb2_grpcdef gen_response_model_handle (grpc_model: GrpcModel ) -> Type[PaitBaseResponseModel]: class CustomerJsonResponseModel (PaitJsonResponseModel ): class CustomerJsonResponseRespModel (BaseModel ): code: int = Field(0 , description="api code" ) msg: str = Field("success" , description="api status msg" ) data: msg_to_pydantic_model(grpc_model.response) = Field(description="api response data" ) name: str = grpc_model.response.DESCRIPTOR.name response_data: Type[BaseModel] = CustomerJsonResponseRespModel return CustomerJsonResponseModeldef add_grpc_gateway_route (app: Flask ) -> None : def _make_response (resp_dict: dict ) -> Response: return jsonify({"code" : 0 , "msg" : "" , "data" : resp_dict}) grpc_gateway_route: GrpcGatewayRoute = GrpcGatewayRoute( app, user_pb2_grpc.UserStub, prefix="/api/gateway" , title="UserGrpc" , gen_response_model_handle=gen_response_model_handle, make_response=_make_response, ) grpc_gateway_route.init_channel(grpc.intercept_channel(grpc.insecure_channel("0.0.0.0:9001" )))
这样一来Pait
就能把User
的服务映射到对应的Flask
应用实例了, 但是User服务的部分接口并没有要求用户验证,需要我们先在Flask
实例进行校验后才可以调用gRPC
服务,而对于logout_user
方法则需要token参数。 对于这两个需求,可以通过对GrpcGatewayRoute
的生成路由方法进行改写来达到我们的目的,改写代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class CustomerGrpcGatewayRoute (GrpcGatewayRoute ): def gen_route (self, grpc_model: GrpcModel, request_pydantic_model_class: Type[BaseModel] ) -> Callable: if grpc_model.method in ("/user.User/login_user" , "/user.User/create_user" ): return super ().gen_route(grpc_model, request_pydantic_model_class) else : def _route ( request_pydantic_model: request_pydantic_model_class, token: str = Header.i(description="User Token" ), req_id: str = Header.i(alias="X-Request-Id" , default_factory=lambda : str (uuid4( ) ) ), ) -> Any: func: Callable = self.get_grpc_func(grpc_model.method) request_dict: dict = request_pydantic_model.dict () if grpc_model.method == "/user.User/logout_user" : request_dict["token" ] = token else : result: user_pb2.GetUidByTokenResult = user_pb2_grpc.UserStub(self.channel).get_uid_by_token( user_pb2.GetUidByTokenRequest(token=token) ) if not result.uid: raise RuntimeError(f"Not found user by token:{token} " ) request_msg: Message = self.get_msg_from_dict(grpc_model.request, request_dict) grpc_msg: Message = func(request_msg, metadata=[("req_id" , req_id)]) return self._make_response(self.get_dict_from_msg(grpc_msg)) return _route
这样一来业务逻辑就跟原本的逻辑一样了,可以进行最后一步操作–往Flask
应用注入对应的路由,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def create_app () -> Flask: app: Flask = Flask(__name__) app.json_encoder = CustomJSONEncoder app.register_blueprint(manager_book_bp) app.register_blueprint(social_book_bp) app.register_blueprint(user_bp) book_grpc_service: BookGrpcService = BookGrpcService("0.0.0.0" , 9000 ) book_grpc_service.channel_ready_future(timeout=3 ) user_grpc_service: UserGrpcService = UserGrpcService("0.0.0.0" , 9001 ) user_grpc_service.channel_ready_future(timeout=3 ) ContextMiddleware(app=app, book_grpc_service=book_grpc_service, user_grpc_service=user_grpc_service) add_grpc_gateway_route(app) add_doc_route(app) app.errorhandler(Exception)(api_exception) return appif __name__ == "__main__" : config.init_config(apply_func_list=[apply_block_http_method_set({"OPTIONS" , "HEAD" })]) create_app().run("localhost" , port=8000 )
代码修改完毕后,分别先启动User和Book服务,再启动Flask应用,并在浏览器输入http://127.0.0.1:8000/swagger
即可看到通过Pait Json
网关生成的接口的接口文档页面:
在检查文档展示的接口与Protobuf文件描述的是一致后,可以通过接口文档页面来尝试生成的gRPC Json
网关是否可以正常使用,如下动图,其中左上图为User
服务,左下图为Flask
应用,而右半边的图是Swagger
页面:
除了动图的操作外,还可以尝试修改uid
等字段的长度在执行,会发现Flask
应用由于我们传过来的值不满足校验规则而抛出错误。
附录 示例代码只是为了演示,并无任何实际意义,也不适用于生产环境:
附录一,简单的gRPC示例项目代码 api服务:https://github.com/so1n/grpc-example-api-backend-service 用户服务: https://github.com/so1n/grpc-example-user-grpc-service 书籍管理服务:https://github.com/so1n/grpc-example-book-grpc-service gRPC
公有包(包括gRPC
调用封装和protobuf文件):https://github.com/so1n/grpc-example-common
附录二,使用Pait快速构建gRPC Json网关代码 api服务:https://github.com/so1n/grpc-example-api-backend-service gRPC
公有包:https://github.com/so1n/grpc-example-common/tree/pait-example
其他服务只需要把gRPC
公有包依赖更新到pait-example
分支即可
服务三,使用文档 Pait Json
网关文档: https://so1n.me/pait-zh-doc/7_gRPC_gateway/ protobuf_to_pydantic文档