Python-gRPC实践(6)--编写gRPC网关

本文总阅读量

前言

就目前而言,在大多数场景中,用户侧的客户端和服务端还是通过HTTP进行交互,然后服务端内部各种服务再通过过各种协议进行交互,所以在采用了gRPC做内部服务的交互协议后,就需要一个网关来把gRPC的调用自动映射到HTTP应用上,方便客户端调用。

注:本文只针对一对一的gRPC服务。

之前原本想当做一个简单的实现,后面发现可以与我编写的pait框架可以嵌到一起,这样也可以对多个Web框架提供Gateway功能,于是就把这个功能的实现嵌入到pait框架中,所以下面有些代码来自于我编写的pait框架,对应的gRPC网关文档使用说明见:Pait gRPC Gateway

1.网关的基础实现思路

网关的思路很简单,就是把HTTP请求转发到了对应的gRPC服务,在Go生态中,已经出现了一个类似的框架–grpc-gateway,它负责grpc服务自动的映射到HTTP服务中,同时它通过让HTTP服务和gRPC服务监听同一个端口来降低使用者的疑惑。

通过前面的文章我们了解到,gRPC采用的是HTTP/2做传输协议,而HTTP/2是对于HTTP/1.1的一个升级,这样的话服务端在接收socket的数据且转为HTTP数据后就可以判断Header中Upgrade字段对应的值来把数据分别分发到HTTP服务和gRPC服务。

但是,Python的gRPC是通过gRPC-C转译的,也就是它的底层是C实现的,所以Python的gRPC无法传socket来启动服务,而是只能通过指定ip:port的形式来启动服务,如下:

1
2
3
4
5
6
import grpc
from concurrent import futures

grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
).add_insecure_port("127.0.0.1:9000")

这样上述的做法就无法实现了,只能让HTTP服务和gRPC服务分别监听不同的端口,无法做到像Go grpc-gateway那样,毕竟Go才是亲儿子。

最终只能以下面的方式实现:

图中客户端发起的请求会先发送到HTTP服务绑定的端口,HTTP服务收到请求后,会转发到gRPC客户端,交由gRPC客户端发送到gRPC服务端,这样一系列服务就串起来了。

图中的示例HTTP服务与gRPC客户端在同一个进程,而gRPC服务在同台机器的另外一个进程,但是他们也可以在同一个进程内。

2.如何编写转发路由

总体思路决定后,就开始细究路由的实现了,路由只负责转发,不做其它功能,通过前面文章Python-gRPC实践(1)–gRPC简介的捉包可以知道,gRPC传输数据时,采用的是HTTP/2来传输请求,用的是POST方法,同时使用Body传输数据,那么我们可以采用尽量相似的方法来接收请求,再通过gRPC客户端传到对应的服务端,如下的Proto文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";
package example;


message LoginUserRequest {
string uid = 1;
string password = 2;
}
message LoginUserResult {
string uid = 1;
string user_name = 2;
string token = 3;
}


service User {
rpc login_user(LoginUserRequest) returns (LoginUserResult);
}

接着,以这种方法来创建一个路由来映射这个Proto文件的请求,这个路由采用POST方法然后解析对应的数据并通过对应的gRPC客户端传递到服务端,为了尽量的跟gRPC一致,所以HTTP服务与客户端通过Json进行交互,映射的路由代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import grpc
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette.responses import JSONResponse

# pait项目的演示代码,由Proto文件生产的
from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2
from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2_grpc


async def login_route(request: Request) -> JSONResponse:
# 接收请求
request_dict: dict = await request.json()
user_service: user_pb2_grpc.UserStub = request.app.state.user_service
# 发送请求到对应的gRPC服务端
result: user_pb2.LoginUserResult = user_service.login_user(
user_pb2.LoginUserRequest(uid=request_dict["uid"], password=request_dict["password"])
)
# 返回对应的响应
return JSONResponse(
{
"uid": result.uid,
"user_name": result.user_name,
"token": result.token,
}
)


def create_app() -> Starlette:
# 绑定路由
app: Starlette = Starlette(routes=[Route("/api/login", login_route, methods=["POST"])])

def _before_server_start() -> None:
# 启动时绑定gRPC客户端到对应的实例
app.state.user_service = user_pb2_grpc.UserStub(grpc.aio.insecure_channel("0.0.0.0:9000"))

app.add_event_handler("startup", _before_server_start)
return app


if __name__ == "__main__":
import uvicorn # type: ignore

uvicorn.run(create_app(), log_level="debug")

这样一个简单的转发就编写完毕了,但是目前是手动编写的代码,意味着每新增一个接口,我们就需有编写一个对应的路由函数,同时如果Protobuf文件发生了变更,那么我们需要定位到对应的代码再修改,非常麻烦,特别是我们在定义Protobuf文件时,已经很像在定义一个接口了,结果还要再编写路由函数,效率非常低,所以一个网关最关键的是需要通过Protobuf文件或者通过Protobuf的产物来自动生成路由函数并映射到对应的服务,这样一来,其它使用者后续只要修改Protobuf文件即可。

3.提取路由需要的数据

为了自动生成路由,还需要一些数据,通过编写的转发路由可以发现,我们只需要收到用户的请求,解析数据,然后通过对应的gRPC客户端方法来发送数据即可,所以需要通过这个gRPC客户端的方法来找到所有需要的数据(尝试了多种方法,目前这种是最优的)。

以上面的Protobuf文件自动生成的Python代码中的user_pb2_grpc.UserStub为例子(可以通过pait/example/example_grpc了解),它的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import grpc

from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2 as example__proto_dot_user_dot_user__pb2
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2


class UserStub(object):
def __init__(self, channel):
self.get_uid_by_token = channel.unary_unary(
'/user.User/get_uid_by_token',
request_serializer=example__proto_dot_user_dot_user__pb2.GetUidByTokenRequest.SerializeToString,
response_deserializer=example__proto_dot_user_dot_user__pb2.GetUidByTokenResult.FromString,
)
self.logout_user = channel.unary_unary(
'/user.User/logout_user',
request_serializer=example__proto_dot_user_dot_user__pb2.LogoutUserRequest.SerializeToString,
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
)
self.login_user = channel.unary_unary(
'/user.User/login_user',
request_serializer=example__proto_dot_user_dot_user__pb2.LoginUserRequest.SerializeToString,
response_deserializer=example__proto_dot_user_dot_user__pb2.LoginUserResult.FromString,
)
self.create_user = channel.unary_unary(
'/user.User/create_user',
request_serializer=example__proto_dot_user_dot_user__pb2.CreateUserRequest.SerializeToString,
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
)
self.delete_user = channel.unary_unary(
'/user.User/delete_user',
request_serializer=example__proto_dot_user_dot_user__pb2.DeleteUserRequest.SerializeToString,
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
)

通过这份生成的代码可以发现,这个对象类中包括了每个调用的url,请求体和响应体,这样再加上上面说了gRPC是以POST方法进行请求的,那么构成一个最小路由的条件都已经满足了。
但是,要通过这份代码提取数据却比较麻烦,由于都是在__init__方法后才初始化各个gRPC的调用方法,所以没办法通过dir(UserStub)获取到对应的gRPC调用方法,只能通过UserStub(channel).__dict__来获取gRPC的调用方法,不过channel最好需要在对应的HTTP服务启动时再初始化,特别是对于uvicorn来说,如果grpc.aio.channel提前初始化,那么他们使用的不是同一个事件循环,这意味着就必须在Web实例启动后再注册路由,但由于Sanic等框架是不支持实例启动后再注册路由的,所以这个方法行不通。此外,示例代码中的请求体,响应体都是调用对应的方法再加载到channel.unary_unary中,之后我们没办法在运行时通过UserStub的实例来直接获取。

幸运的是,由Proto文件生成的UserStub对象是有规则的,所以可以在程序运行时获取到UserStub的源码,然后通过一定的规则把数据提取出来,分析上面的源码,可以发现有如下几个规则:

  • 1.self.xxx中是gRPC调用的方法名,同时channel.unary_unary代表了这是一对一的请求。
  • 2.self.xxx的下面第一行是gRPC方法对应的url
  • 3.self.xxx的下面第二行出现了gRPC方法的请求对象,这一行是以request_serializer=开头,以.SerializeToString结尾,可以通过正则提取中间的对象字符串,然后可以在运行时通过解析字符串从UserStub所在的模块中提取到正确的请求对象。
  • 4.self.xxx的下面第三行则是gRPC方法的响应对象,处理方法跟请求对象类似。

发现了规则后就可以通过规则提取对应的数据,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import inspect
import re
from typing import Any, List, Generator, Optional, Type
from types import ModuleType
from google.protobuf.message import Message

from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2_grpc


def _gen_message(class_module: ModuleType, line: str, match_str: str) -> Type[Message]:
# 通过正则提取对应的字符串
module_path_find_list = re.findall(match_str, line)
if len(module_path_find_list) != 1:
raise ValueError("module path not found")
module_path: str = module_path_find_list[0]
module_path_list: List[str] = module_path.split(".")
# 通过UserStub模块获取到消息体对应的模块
message_module: ModuleType = getattr(class_module, module_path_list[0])
# 从模块提取到对应的对象
message_model: Type[Message] = getattr(message_module, module_path_list[1])

if not issubclass(message_model, Message):
raise RuntimeError("Can not found message")
return message_model


def parse(stub: Any) -> Generator[tuple, None, None]:
# 提取对应对象的源码,以`\n`切分成每一行
line_list: List[str] = inspect.getsource(stub).split("\n")

# 获取对象的模块,这样就能知道这个模块引用了什么包,方便后面提取请求和响应对象
class_module: Optional[ModuleType] = inspect.getmodule(stub)
if not class_module:
raise RuntimeError(f"Can not found {stub} module")
# 通过enumerate,可以获取到行数对应的索引
for index, line in enumerate(line_list):
# 不是self.所在的行,就跳过
if "self." not in line:
continue
# 只映射一对一的gRPC请求,其它gRPC请求则忽略
if "channel.unary_unary" not in line:
continue

# 通过这一行提取gRPC调用端方法名
invoke_name: str = line.split("=")[0].replace("self.", "").strip()
# 从下一行提取url
method: str = line_list[index + 1].strip()[1:-2]
# 从下两行提取请求对象
request: Type[Message] = _gen_message(
class_module, line_list[index + 2], r"request_serializer=(.+).SerializeToString"
)
# 从下三行提取响应对象
response: Type[Message] = _gen_message(
class_module, line_list[index + 3], r"response_deserializer=(.+).FromString"
)
# 返回提取的数据
yield invoke_name, method, request, response


if __name__ == "__main__":
for i in parse(user_pb2_grpc.UserStub):
print(i)


这段代码执行后就会输出以下结果,可以发现,这段代码正常的从UserStub中提取到了我们想要的数据。

1
2
3
4
5
('get_uid_by_token', '/user.User/get_uid_by_token', <class 'example_proto.user.user_pb2.GetUidByTokenRequest'>, <class 'example_proto.user.user_pb2.GetUidByTokenResult'>)
('logout_user', '/user.User/logout_user', <class 'example_proto.user.user_pb2.LogoutUserRequest'>, <class 'google.protobuf.empty_pb2.Empty'>)
('login_user', '/user.User/login_user', <class 'example_proto.user.user_pb2.LoginUserRequest'>, <class 'example_proto.user.user_pb2.LoginUserResult'>)
('create_user', '/user.User/create_user', <class 'example_proto.user.user_pb2.CreateUserRequest'>, <class 'google.protobuf.empty_pb2.Empty'>)
('delete_user', '/user.User/delete_user', <class 'example_proto.user.user_pb2.DeleteUserRequest'>, <class 'google.protobuf.empty_pb2.Empty'>)

不过这还不够,比如使用者可能想自定义这个请求映射的URL,或者是控制这个gRPC请求方法是否要映射等等,这些功能是无法通过Proto文件来控制的,如果写在另一个文件配置又非常麻烦,最后决定通过Proto文件的注释来实现这些功能,比如下面的Proto代码:

1
2
3
4
5
6
7
8
service User {
// The interface should not be exposed for external use
// pait: {"enable": false}
rpc get_uid_by_token (GetUidByTokenRequest) returns (GetUidByTokenResult);
// pait: {"tag": [["grpc-user", "grpc_user_service"], ["grpc-user-system", "grpc_user_service"]]}
// pait: {"summary": "Create users through the system", "url": "/user/create"}
rpc create_user(CreateUserRequest) returns (google.protobuf.Empty);
}

这段代码有一个标准的pait:开头,这样在解析时,就可以知道这一行是需要解析的文本,然后就会去解析后面跟着的一串Json字符串,就可以知道get_uid_by_token设置了enablefalse,而create_user设置了url为/user/create。不过,默认生成的代码中,客户端对象是没有携带Proto文件的注释信息的,只有他们对应的服务端对象才有携带,比如UserStub没有携带注释信息,而UserStub对应的User对象则携带,所以需要从User中提取数据。

此外,对于请求对象和响应对象也可以采用这样的思路,但是Python的gRPC库通过Proto文件生成Python代码时,Proto文件中为Message编写的注释是不会一起跟过来的,需要自己编写一个插件,或者通过mypy-proto插件生成的文件来获取。由于这部分的代码比较长且繁杂,这里只做简单说明,具体源码见pait.util.groc_inspect.stub.py

4.自动映射路由

至此,url,请求方法,请求体和响应体都获取到了,现在可以自动映射路由了,还是以starlette框架为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import grpc
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from google.protobuf.message import Message
from google.protobuf.json_format import MessageToDict
from typing import Any, Callable

# pait项目的演示代码,由Proto文件生产的
from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2
from example.example_grpc.python_example_proto_code.example_proto.user import user_pb2_grpc


def gen_route(
app: Starlette,
stub: Any,
url: str,
invoke_name: str,
grpc_request: Message,
grpc_response: Message,
) -> None:
"""
:param app: Starlette实例
:param stub: gRPC客户端对应的类
:param url: url
:param invoke_name: gRPC调用的方法名
:param grpc_request: gRPC请求体
:param grpc_response: gRPC响应体,可以发现它没有被使用
"""
def _route(request: Request) -> JSONResponse:
resp_dict: dict = await request.json()
# 通过app实例携带的channel来调用gRPC服务
func: Callable = getattr(stub(app.state.channel), invoke_name)
# 把gRPC响应数据专为json响应
return JSONResponse(MessageToDict(await func(grpc_request(**resp_dict))))

# 注册路由
app.add_route(url, _route, methods=["POST"])


def create_app() -> Starlette:
# 绑定路由
app: Starlette = Starlette()

# 通过调用上面段parse函数得到了gRPC对应的数据再通过gen_route来自动注册路由
for parse_result_list in parse(user_pb2_grpc.UserStub):
gen_route(
app,
user_pb2_grpc.UserStub,
*parse_result_list,
)

def _before_server_start() -> None:
# 启动时绑定gRPC客户端对应的channel
app.state.channel = grpc.aio.insecure_channel("0.0.0.0:9000")

app.add_event_handler("startup", _before_server_start)
return app


if __name__ == "__main__":
import uvicorn # type: ignore

uvicorn.run(create_app(), log_level="debug")

5.总结

至此,gRPC网关的最核心的实现已经编写完成了,不过可以发现gRPC的响应体并没有被使用,同时这个实现非常简单,不容易拓展,可以通过自定义Gateway Route路由函数了解更多的拓展。

查看评论