Python-gRPC实践(3)--使用Python实现gRPC服务

本文总阅读量

前言

通过前面的文章了解到了gRPC是什么,以及清楚使用它的优缺点,现在终于可以开始实现一个gRPC服务了。

这里演示的是一个用户与书互动的项目,用户可以通过该项目进行注册,登录,注销等操作,同时也可以上传,查看和评论对应的书籍,通常情况下我们会由一个简单的Web应用来提供这些服务,现在,我们假设这个服务非常庞大,需要把他们按照功能拆分成不同的微服务了,这些服务与Web应用通过gRPC进行通信。

注:由于篇幅原因,不会夹杂大量的源代码,需要跳转到Github中查看,同时对于业务逻辑也不会详细的介绍,所以可能需要一些接口开发经验才容易阅读懂。

1.初始化准备

在创建项目之前,我们需要确定我们的需求是什么,就像开发API接口一样,先了解需求,然后多方根据需求定义好接口,最后才为每个接口编写对应的代码,在这个项目中,我假定了拆分了两个服务,一个是与用户有关, 一个是与书籍有关,书籍部分又细分为书籍管理,书籍社交两部分。为此,先编写了Protobuf文件,之前在Python-gRPC实践(2)–Protocol Buffer中说过,我们创建gRPC对应的Protobuf文件应该放在一个公有的仓库中,这样就方便后续的Protobuf文件升级以及不同语言都能共享同一份Protobuf文件。

所以创建一个gRPC服务的第一步就是先创建一个包含Protobuf文件的仓库,我把它命名为grpc-example-common,具体源码可以通过grpc-example-common获取。

这个仓库中pyproject.toml文件的tool.poetry.dependencies部分如下:

1
2
3
4
[tool.poetry.dependencies]
python = "^3.8"
grpcio = "^1.43.0"
grpcio-tools = "^1.43.0"

通过这部分文件可以知道这个项目是基于Python3.8版本的,然后用到了2个依赖分别是grpcio以及grpcio-tools,其中grpcioPythongRPC实现,它是通过c语言翻译的,所以很多底层都是c实现的,如果在使用gRPC框架的过程中找不到对应的使用方法说明,那可以直接到gRPC的c项目中找到对应的函数并查看它的函数说明进而了解该函数的作用;而另一个库grpcio-tools的作用是把proto文件转译为Python代码,不过单靠grpcio-tools转译的代码很难使用,比如是这段代码:

1
2
3
4
from grpc_example_common.protos.user.user_pb2 import LoginUserResult 


login_user_result: LoginUserResult = LoginUserResult()

这段代码引入了由grpcio-tools通过用户Protobuf文件生成的LoginUserResult对象,开发者在后续想要使用这个对象的时候,IDE是没办法提示你这个对象有什么属性的,只能凭自己的记忆进行填写,或者回到对应的Protobuf文件查看该对象的定义:

buffer
1
2
3
4
5
message LoginUserResult {
string uid = 1;
string user_name = 2;
string token = 3;
}

发现它有uiduser_nametoken三个属性,然后才会在代码填写LoginUserResult对象的属性进行调用:

1
2
3
4
5
6
7
8
9
10
from grpc_example_common.protos.user.user_pb2 import LoginUserResult 


login_user_result: LoginUserResult = LoginUserResult(
uid="123",
user_name="so1n",
token="aaa"
)
print(login_user_result.uid)
# 123

这时即使填错了,比如uid写为uid1IDE也不会提示有错误,我们需要等到运行时报错才知道是填错了。

这样一个场景是会让开发者非常难受的,明明都定义了一个Protobuf文件,文件中已经写了这个消息有什么属性了,结果生成对应的类却无法让IDE了解它有什么属性(跳进去源码也无法知道),这时就需要通过mypy-protobuf来解决这一个问题。mypy-protobuf会生成的一份独立的.pyi文件,这样一来IDE就可以帮忙提示这个对象有什么属性了,如图:
IDE提醒
此外,通过.pyi文件可以使mypy等工具校验我们的代码类型是否正确,这样在运行前就能知道代码是否有问题。

mypy-protobuf的使用方法十分的简单,它以grpcio-tools的一个插件来运行,具体的使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 定义生产文件的存放目录,通常都会在指定的目录下生成一个proto的文件夹
target_p = "xxx"
# 定义proto文件的目录
sourct_p = "xxx"
python -m grpc_tools.protoc \
# 指定xxx_pb2文件和xxx_pb2_grpc文件生成位置,通常我们都让他们在同一个文件夹生产
--python_out=./$target_p \
--grpc_python_out=./$target_p \
# 指定proto文件的位置
-I. \
$source_p/user/*.proto
# 上面是标准的grpcio-tools执行的标准语句
# 指定`mypy-protobuf`生成xxx_pb2和xxx_pb2_grpc对应的pyi文件的位置,必须与xxx_pb2和xxx_pb2_grpc位置保持一致
--mypy_grpc_out=./$target_p \
--mypy_out=./$target_p \

只要运行了这段命令,grpc_tools就能在对应的路径下生成Protobuf对应的代码和对应的pyi文件,不过当前的grpcio-tools默认生成的代码所在的目录名是protos,它认为这个目录是在项目对应的根目录下生成的,如果我们指定在某个子目录下生产对应的代码,那么在运行程序时会直接报错,因为生成的代码文件中有一个大概长成这样的语句:

1
2
# xxx为proto文件的名
from protos.xxx import

这意味着它永远都是从项目的根目录开始引入的protos包,但我们根目录却没有这个包,所以就会报错,这时就需要手动把生成的语句替换为:

1
2
# xxx为proto文件的名
from .xxx import

这样就可以完美运行了,但是每个文件手动改一下会非常的麻烦,因为每次生成代码后都要手动更改代码,同时由于项目存在多个Protobuf文件,每个文件都需要执行一次命令才能生成对应的代码。对于一个开发者来说,最讨厌的就是一直执行重复的工作,这种工作是非常烦心的, 所以需要编写了一个脚本来自动的把所有Protobuf文件转为Python代码(也就是项目中的gen_rpc.sh文件),该脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 设置脚本运行的Python环境 
export VENV_PREFIX=""
if [ -d 'venv' ] ; then
export VENV_PREFIX="venv/bin/"
fi
if [ -d '.venv' ] ; then
export VENV_PREFIX=".venv/bin/"
fi

echo 'use venv path:' ${VENV_PREFIX}

# 设置生成的存放Python代码的proto文件夹的目录
target_p='grpc_example_common'
# 设置Proyobuf文件所在位置
source_p='protos'
# 设置生成protobuf代码文件的文件名
service_list=("book" "user")

# 清理之前生成的代码
rm -r "${target_p:?}/${source_p:?}"*
# 创建对应的文件夹
mkdir -p "${target_p:?}/${source_p:?}"

# 批处理
for service in "${service_list[@]}"
do
# 生成proto文件对应的Python代码逻辑,每个proto文件执行一次
mkdir -p "${target_p:?}/${source_p:?}/${service:?}"
echo "from proto file:" $source_p/"$service"/*.proto "gen proto py file to" $target_p/$source_p
${VENV_PREFIX}python -m grpc_tools.protoc \
--mypy_grpc_out=./$target_p \
--mypy_out=./$target_p \
--python_out=./$target_p \
--grpc_python_out=./$target_p \
-I. \
$source_p/"$service"/*.proto

# 创建一个__init__文件,这样一来这个文件夹就是一个包了,下面转换为from . import语句才能生效
touch $target_p/$source_p/"$service"/__init__.py
# fix grpc tools bug
sed -i "s/from protos.$service import/from . import/" $target_p/$source_p/$service/*.py
done

这样一来,我们通过Protobuf文件生成Python代码的操作就非常省心了,不管Protobuf文件有何改动,只要通过调用命令后就能在grpc_example_common.protos目录下看到已经生成的最新的Python代码,目前grpc_example_common的项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
├── grpc_example_common    # Python与gRPC相关的调用
│   ├── helper
│   ├── __init__.py
│   ├── interceptor
│   └── protos # 生成的对应Python代码
├── protos # Protobuf文件
│   ├── book
│   └── user
├──.flake8 # 格式化工具的配置
├──.pre-commit-config.yaml # 格式化工具的配置
├── gen_rpc.sh # 通过proto文件生成Python gRPC调用代码的脚本
├── mypy.ini # 格式化工具的配置
├── pyproject.toml # Python项目配置文件
├── README.md
├── requirements-dev.txt # 测试环境的依赖文件
├── requirements.txt # 正式环境的依赖文件
└── setup.py

通过项目结构可以看出还有其它的东西,这是我为了方便,我还在这个项目中添加一些PythongRPC相关的调用封装,把它当做一个Python的自定义包。

需要注意的是,每修改一次Protobuf文件应该视为一次版本发布,当生成完Protobuf文件的对应代码后,我们需要提交代码并打上对应的tag,这样其它项目才能引用到对应的版本代码。

对于格式化工具以及poetry包管理工具不了解的可以通过文章保障Python项目质量的工具了解。

grpc_example_common目录下还有其它常用的封装,将会在后续章节介绍。

2.编写gRPC服务项目

目前这个演示的项目有两个子gRPC项目,分别为grpc-example-book-grpc-servicegrpc-example-user-grpc-service,他们的结构很像,所以这一节以grpc-example-user-grpc-service来阐述如何创建一个gRPC服务。

该项目的代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
├── tests                 # 存放测试用例
│   ├── __init__.py
│   └── test_user.py
├── user_grpc_service # 项目代码真正所在的位置
│   ├── dal # service代码,一般用于查询Mysql,Redis的逻辑
│   ├── handler # 业务逻辑代码,继承对应Protobuf文件生成的类
│   ├── helper # 其它代码封装。
│   └── __init__.py
├── app.py # 项目代码入口
├── mypy.ini # mypy配置文件
├── pyproject.toml # 项目配置文件
└── user.sql # 项目初始化SQL

首先,该项目会通过如下配置引入一些依赖:

1
2
3
4
5
6
[tool.poetry.dependencies]
python = "^3.8"
DBUtils = "^3.0.0"
PyMySQL = "^1.0.2"
cryptography = "^36.0.1"
grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git", tag="v0.1.2"}

其中grpc_example_common项目就是包括我们上面通过Protobuf生成的文件生产的代码,以及一些自定义的封装,通过引入依赖后,可以很方便的引用Protobuf文件生成的代码。

安装依赖后,就可以在项目中编写对应的gRPC服务了, 在这个项目里有一个比较简单的分层,所有的gRPC服务接口处理的函数都在放在user_grpc_service.handler目录中,而与数据库交互的则放在user_grpc_service.dal中。

编写服务的第一步,就是在user_grpc_service.handler编写对应的代码,先创建一个名为user.py的文件,该文件的代码值负责对User服务的调用,由于对于User服务只有一个子服务,里面只需要创建一个名为UserServicer的类,这个类似继承于Protobuf生成的user_pb2_grpc.UserServicer类,如下:

1
2
3
4
5
# 通过common包引入对应Protobuf文件生成的代码
from grpc_example_common.protos.user import user_pb2_grpc as user_service

class UserServicer(user_service.UserServicer):
pass

这时候IDE会在UserServicer上显示波浪线,如果鼠标移到波浪线位置上,IDE会提示类 User 必须实现所有 abstract 方法,于是点击实现abstract方法后,就会自动生成类似于下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from google.protobuf.empty_pb2 import Empty  # type: ignore
from grpc_example_common.protos.user import user_pb2 as user_message
from grpc_example_common.protos.user import user_pb2_grpc as user_service


class UserServicer(user_service.UserServicer):

def logout_user(self, request: user_message.LogoutUserRequest,
context: grpc.ServicerContext) -> Empty:
pass

def login_user(self, request: user_message.LoginUserRequest,
context: grpc.ServicerContext) -> user_message.LoginUserResult:
pass

def create_user(self, request: user_message.CreateUserRequest,
context: grpc.ServicerContext) -> Empty:
pass

def delete_user(self, request: user_message.DeleteUserRequest,
context: grpc.ServicerContext) -> Empty:
pass

def check_user_login(self, request: user_message.LogoutUserRequest,
context: grpc.ServicerContext) -> user_message.CheckLoginResult:
pass

这段代码就是protos/user/user.proto对应的Python代码表达,当客户端调用UserServicer.logout_user方法时,服务端就会自动转到该方法执行对应的逻辑,并返回结果给客户端,所以对于开发者来说只要专心完成好这几个接口的实现即可。开发者编写此处的业务逻辑代码与平时编写的API代码基本没什么差别,这里不多做阐述,具体的业务逻辑可见user_grpc_service/handler/user.py

不过需要注意的是从request中得到的数据对象并不是Python中常见的对象,而是gRPC封装的且类似于Python常见的对象,如果直接用于pymysql的类似代码:

1
2
with conn.cursor() as cursor:
cursor.execute(sql, param)

那么execute可能会转码失败,导致拼接不了正确的SQL,这时候可以把request中得到的对象转为Python中场见的对象,比如gRPC的时间类型TimestampPython时间类型datetime转换如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import datetime
from dataclasses import MISSING
from typing import Any, Optional

from google.protobuf.timestamp_pb2 import Timestamp # type: ignore


def timestamp_to_datetime(t: Timestamp, default: Any = MISSING) -> datetime.datetime:
"""replace proto.timestamp to python datetime.datetime"""
if t.seconds == 0 and t.nanos == 0 and default != MISSING:
return default
return t.ToDatetime()

def datetime_to_timestamp(d: Optional[datetime.datetime]) -> Timestamp:
"""replace python datetime.datetime to proto.timestamp"""
t: Timestamp = Timestamp()
if d:
t.FromDatetime(d)
return t

通过封装好的timestamp_to_datetimedatetime_to_timestamp可以方便的在业务逻辑中对gRPCPython对象进行转换,更多类型转换见grpc_example_common/helper/field.py,不过这种转换的实现是非常简单的,性能也不是很好,如果为了追求性能,可以尝试使用pure-protobuf,它会带来一点复杂性,但是使用感受会非常好,性能也非常棒。

业务代码编写完后,需要绑定到对应的Server上面才能正常的提供服务,于是我们需要像创建Flask Server一样,先创建一个服务,然后把路由注册进去,对于gRPC的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
import os
from concurrent import futures
from typing import List, Optional

import grpc
from grpc_example_common.interceptor.server_interceptor.base import BaseInterceptor
from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor

from user_grpc_service.handler.user import UserService, user_service

logging.basicConfig(
format="[%(asctime)s %(levelname)s] %(message)s",
datefmt="%y-%m-%d %H:%M:%S",
level=logging.DEBUG,
)
logger: logging.Logger = logging.getLogger()


def main(host: str = "127.0.0.1", port: str = "9000", ssl_port: Optional[str] = None) -> None:
# 拦截器列表
interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()]
# 创建一个gRPC服务
server: grpc.server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=interceptor_list,
)
# 绑定我们的业务实现到服务上
user_service.add_UserServicer_to_server(UserService(), server)

if ssl_port:
# 如果是启用了ssl,则读取文件,然后建立一个安全的连接
port = ssl_port
# read in key and certificate
with open(os.path.join(os.path.split(__file__)[0], "server.key")) as f:
private_key = f.read().encode()
with open(os.path.join(os.path.split(__file__)[0], "server.crt")) as f:
certificate_chain = f.read().encode()
# create server credentials
server_creds = grpc.ssl_server_credentials(
(
(
private_key,
certificate_chain,
),
)
)
server.add_secure_port(f"{host}:{port}", server_creds)
else:
# 否则建立一个普通的连接
server.add_insecure_port(f"{host}:{port}")
# 启动服务
server.start()
try:
# 打印我们挂载了多少个子服务(也就是上面注册的服务)
for generic_handler in server._state.generic_handlers:
logger.info(
f"add service name:{generic_handler.service_name()} cnt:{len(generic_handler._method_handlers)}"
)
logger.info(f"server run in {host}:{port}")
# 一直运行,直到被关闭
server.wait_for_termination()
except KeyboardInterrupt:
# 收到退出的信号,关闭服务
server.stop(0)


if __name__ == "__main__":
main()

可以看到这段代码非常简单, 但是他肩负了很多请求和连接的健康维护,会在后续的章节中详细介绍。

3.测试编写的gRPC服务

代码编写完成后就应该发起请求,看看这个服务是否能正常运行,但是gRPC服务不像HTTP服务一样可以在浏览器等地方输入一个URL就能发起一个请求,所以为了能验证我们的服务能否正常的运行,我们应该编写一个测试用例。

在官方文档gRPC Testing中介绍了gRPC的测试用例编写方法,但是这个只覆盖到了业务代码,无法覆盖到拦截器,参数调优等逻辑,而我目前使用到了一个名为CustomerTopInterceptor的拦截器,它在发现业务代码有异常的时候会把异常通过meta_data传给客户端,然后客户端进行解析并抛出对应的异常(这种实现可能不是最优雅的,但是符合需求),如果采用了官方的gRPC Testing,那在测试用例中山无法捕获到对应的异常的,所以只能采取其它的测试方法来编写一个覆盖范围更广的测试用例–pytest-grpc

首先是安装好pytest-grpc,然后按照标准的测试用例编写习惯,在项目根目录创建一个名为tests的目录,(当然,我也在pyproject.toml指定了pytest的执行目录为tests),然后在里面编写每个子服务的测试代码,一般来说一个子服务对应一个Python文件,接着在这个文件的最前面编写服务pytest-grpc要求的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from typing import Callable, List

import grpc
from grpc_example_common.interceptor.client_interceptor.customer_top import (
CustomerTopInterceptor as ClientCustomerTopInterceptor,
)
from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor
from grpc_example_common.protos.user import user_pb2, user_pb2_grpc

from user_grpc_service.handler.user import UserService
from user_grpc_service.helper.conn_proxy import SteadyDBConnection, g_db_pool

# 应用的是整个文件的测试用例, 所以都需要写上@pytest.fixture(scope="module")


@pytest.fixture(scope="module")
def grpc_add_to_server() -> Callable:
# 指定该子服务对应的添加服务接口
return user_pb2_grpc.add_UserServicer_to_server


@pytest.fixture(scope="module")
def grpc_servicer() -> UserService:
# 指定我们编写该子服务的类
return UserService()


@pytest.fixture(scope="module")
def grpc_interceptors() -> List[grpc.ServerInterceptor]:
# 指定服务端对应的拦截器
return [CustomerTopInterceptor()]


@pytest.fixture(scope="module")
def grpc_stub(grpc_channel: grpc.Channel) -> user_pb2_grpc.UserStub:
# 指定子服务对应的客户端
# 这里会先生成一个channel,该channel用于跟服务端通信,
# 同时它也有一个与服务端拦截器CustomerTopInterceptor对应的拦截器ClientCustomerTopInterceptor
# 最后把channel应用到对应的子服务Stub上面
channel: grpc.Channel = grpc.intercept_channel(grpc_channel, ClientCustomerTopInterceptor())
return user_pb2_grpc.UserStub(channel)

创建完成后我们就可以继续在该文件编写对应的测试用例代码了,这样在运行的时候都会自动加载上面代码,然后我们可以在每个测试用例都使用上面代码创建的客户端grpc_stub来发起请求进行测试。以创建用户和删除用户两个接口为例子,创建用户接口调用后会在数据库生成一条对应的数据,而删除用户接口会从数据库删除一条对应的数据,如果数据不存在于数据库,则会抛出RuntimeError异常(具体代码逻辑可见user_grpc_service/handler/user.py),他们的测试用例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from contextlib import contextmanager
from typing import Callable, Generator, List

@contextmanager
def mock_user(
uid: str = "666666",
user_name: str = "so1n",
password: str = "123456"
) -> Generator[None, None, None]:
"""通过contextmanager可以在对应的代码逻辑创建一个用户,并在结束时自动清除该用户信息"""
conn: SteadyDBConnection = g_db_pool.connection()
try:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO user (uid, user_name, password) VALUES (%s, %s, %s)",
(uid, user_name, password),
)
conn.commit()
yield
finally:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM user WHERE uid=%s", (uid,))
conn.commit()

class TestUser:
def test_create_user(self, grpc_stub: user_pb2_grpc.UserStub) -> None:
"""创建用户调用的测试用例"""
try:
request: user_pb2.CreateUserRequest = user_pb2.CreateUserRequest(
uid="666666", user_name="so1n", password="123456"
)
# 通过客户端带有的create_user方法发起请求,他会请求到我们的服务端代码
# 之后服务端的业务代码会在数据库创建一条对应的用户数据
grpc_stub.create_user(request, metadata=[])
finally:
conn: SteadyDBConnection = g_db_pool.connection()
conn.begin()
with conn.cursor() as cursor:
# 删除刚才创建的用户数据,返回删除的条目数量
ret: int = cursor.execute("DELETE FROM user WHERE uid=%s", ("666666",))
conn.commit()
# 判断是否成功删除一条用户数据,如果是,则代表刚才创建成功。
assert ret == 1

def test_delete_user(self, grpc_stub: user_pb2_grpc.UserStub) -> None:
"""删除用户调用的测试用例"""
uid: str = "666666"
# 创建delete_user对应的请求对象
request: user_pb2.DeleteUserRequest = user_pb2.DeleteUserRequest(uid=uid)
# user not found
with pytest.raises(RuntimeError):
# 当前数据库没有对应的用户数据,会抛出RuntimeError异常,如果pytest能够捕获到这个异常,则证明拦截器生效了。
grpc_stub.delete_user(request, metadata=[])
with mock_user(uid=uid):
# 在数据库存在对应的用户数据下,能正常删除数据,并不抛错
grpc_stub.delete_user(request, metadata=[])

运行测试用例可以发现测试通过了,接下来就可以编写我们的API服务,在API服务中调用我们的gRPC服务。

4.编写API服务

gRPC服务搭建完毕后,终于可以来编写API服务了,有了API服务后,才能把功能提供给了用户,API服务的项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
├── app_service                   # API接口的服务,包括路由和调用封装
│   ├── social_book_route.py
│   ├── manager_book_route.py
│   ├── utils.py
│   ├── route.py
│   ├── user_route.py
│   └── __init__.py
├── grpc_service # 调用gRPC的服务
│   ├── __init__.py
│   ├── user_service.py
│   └── book_service.py
├── tests # 测试用例
│   ├── test_route
│   ├── __init__.py
│   └── conftest.py
├── app.py # app代码
├── gunicorn.conf.py # gunicorn的配置文件
├── pyproject.toml
├── README.md
└── mypy.ini

API服务与gRPC服务一样,通过pyproject.toml的配置:

1
2
3
4
[tool.poetry.dependencies]
python = "^3.8"
grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git", tag="v0.1.4" }
Flask = "^2.0.3"

引用了grpc_example_common的库, 然后在grpc_service中用到了该库,还是以用户服务为例子,用户服务的代码位于项目的grpc_service.user_service.py中,这个代码首先是创建一个Mixin的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class UserGrpcServiceMixin(object):
def __init__(self, channel: grpc.Channel):
self.user_stub: user_service.UserStub = user_service.UserStub(channel)
# 通过grpc_wrapper.grpc_client_func_wrapper为所有的请求带上matedate参数
grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper)

def create_user(self, *, uid: str, user_name: str, password: str) -> None:
self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password))

def delete_user(self, *, uid: str) -> None:
self.user_stub.delete_user(user_message.DeleteUserRequest(uid=uid))

def login_user(self, *, uid: str, password: str) -> user_message.LoginUserResult:
return self.user_stub.login_user(user_message.LoginUserRequest(uid=uid, password=password))

def logout_user(self, *, uid: str, token: str) -> None:
self.user_stub.logout_user(user_message.LogoutUserRequest(uid=uid, token=token))

def get_uid_by_token(self, *, token: str) -> str:
result: user_message.GetUidByTokenResult = self.user_stub.get_uid_by_token(
user_message.GetUidByTokenRequest(token=token)
)
return result.uid

这个类就是对user_service.UserStub的简单封装,可以看到方法名和参数与Protobuf保持一致,它只是接受一个负责通信的channel,然后传入到user_stub中,方便后续的方法对user_stub调用,同时这个类还负责一些数据的转换,如上面提到的ProtobufTimestamps对象转为Pythondatetime对象。

接着创建一个负责子服务管理的UserGrpcSerevice类,这个类负责建立通信和维护通信段稳定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class UserGrpcService(UserGrpcServiceMixin):
# 如果有多个,则在这里继承多个mixin
def __init__(self, host: str, port: int) -> None:
# 初始化与服务端的通信
self.channel: grpc.Channel = grpc.intercept_channel(
grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor()
)
# 传入到对应的服务里
UserGrpcServiceMixin.__init__(self, self.channel)

def channel_ready_future(self, timeout: int = 10) -> None:
# 用于检查是否与服务端建立起连接
target: str = (
f"{self.__class__.__name__}"
f" {self.channel._channel._connectivity_state.channel.target().decode()}" # type: ignore
) # type: ignore
try:
grpc.channel_ready_future(self.channel).result(timeout=timeout)
except grpc.FutureTimeoutError:
logger.exception(f"channel:{target} connect timeout")
raise RuntimeError(f"channel:{target} connect timeout")
else:
logger.info(f"channel:{target} connect success")

创建完毕后对于gRPC客户端调用服务端的逻辑已经封装完成了,接下来就是在路由函数中进行使用了,一般情况下都是使用单例的模式创建一个UserGrpcService的实例,但是我不太喜欢这样做,于是创建了一个中间件,然后通过flask.g传递创建app时初始化的gRPC服务示例,对应的中间件处理代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from typing import Any, Union

from flask import Blueprint, Flask, Response
from flask import g as flask_g
from flask import jsonify, request

from grpc_service.book_service import BookGrpcService
from grpc_service.user_service import UserGrpcService

APP_TYPE = Union[Blueprint, Flask]


class CustomerGType(object):
"""基于flasg.g的封装,这样就可以无忧的使用IDE的提示和重构功能了"""
book_grpc_service: BookGrpcService
user_grpc_service: UserGrpcService

def __getattr__(self, key: str) -> Any:
return getattr(flask_g, key)

def __setattr__(self, key: str, value: Any) -> None:
setattr(flask_g, key, value)


g: CustomerGType = CustomerGType()


class ContextMiddleware(object):
"""基于flask的before_request和after_request钩子创建的一个中间件类"""
def __init__(
self, *, app: APP_TYPE, book_grpc_service: BookGrpcService, user_grpc_service: UserGrpcService
) -> None:
self._app = app
self._app.before_request(self._before_requests)
self._app.after_request(self._after_requests)

self._book_grpc_service: BookGrpcService = book_grpc_service
self._user_grpc_service: UserGrpcService = user_grpc_service

def _before_requests(self) -> None:
# 收到请求点时候把gRPC服务传到指定的变量中
g.book_grpc_service = self._book_grpc_service
g.user_grpc_service = self._user_grpc_service
return

def _after_requests(self, response: Response) -> Response:
return response

中间件创建完成后,就可以在create_app工厂函数中创建对应的gRPC服务,然后赋值到对应的中间件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from flask.app import Flask

from app_service.route import manager_book_bp, social_book_bp, user_bp
from app_service.utils import ContextMiddleware, api_exception
from grpc_service.book_service import BookGrpcService
from grpc_service.user_service import UserGrpcService


def create_app() -> Flask:
app: Flask = Flask(__name__)
# 注册对应的路由
app.register_blueprint(manager_book_bp)
app.register_blueprint(social_book_bp)
app.register_blueprint(user_bp)

# 初始化对应的gRPC服务,并等待建立连接
book_grpc_service: BookGrpcService = BookGrpcService("0.0.0.0", 9000)
book_grpc_service.channel_ready_future(timeout=3)
user_grpc_service: UserGrpcService = UserGrpcService("0.0.0.0", 9001)
user_grpc_service.channel_ready_future(timeout=3)
# 把gRPC服务注入到中间件中
ContextMiddleware(app=app, book_grpc_service=book_grpc_service, user_grpc_service=user_grpc_service)

app.errorhandler(Exception)(api_exception)
return app


if __name__ == "__main__":
create_app().run("localhost", port=8000)

最后,就可以在路由函数中使用对应的gRPC服务了,还是以创建用户和删除用户为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from flask import Response, request
from grpc_example_common.protos.user import user_pb2 as user_message

from app_service.utils import g, get_uid_by_token, make_response


def create_user() -> Response:
request_dict: dict = request.json()
g.user_grpc_service.create_user(
uid=request_dict["uid"], user_name=request_dict["user_name"], password=request_dict["password"]
)
return make_response()


def delete_user() -> Response:
request_dict: dict = request.json()
g.user_grpc_service.delete_user(uid=request_dict["uid"])
return make_response()

这两个接口都是在收到请求后,再调用gRPC服务对应的方法来传递请求,其它服务调用的代码与创建用户和删除用户的例子相同,具体可以访问app_service了解

需要注意的是,通常我们不会在生产环境直接运行Flask,而是采用gunicorn+gevnet来运行我们的API服务,从而增强服务的稳定性和性能,但是gevent是修改Python代码来达到全局代码都不阻塞的,而gRPC本身的调用是包含C代码,gevent无法修改到gRPC调用到的C代码,所以gRPC提供一个名为grpc.experimental.gevent.init_gevent的方法来解决这个问题,如下代码:

1
2
3
4
5
6
7
8
9
10
11
import socket

try:
import gevent
except ImportError:
pass
else:
if socket.socket is gevent.socket.socket:
import grpc.experimental.gevent

grpc.experimental.gevent.init_gevent()

这段代码在初始化时可以通过判断是否启用gevnet来启用grpc.experimental.gevent.init_gevent,通常建议放在gunicorn的配置文件里,具体见gunicorn.conf.py

至此,一个API服务就搭建完毕,可以直接运行后在浏览器进行测试。

5.测试编写的API服务

截止到目前,对于包含gRPC的API服务接口测试没有一个比较好的方法,因为单例测试是不考虑别的服务的,意味着需要对gRPC调用段响应进行Mock,然而用于调用段Stub类只有属性而没有方法,这样会导致mock不成功,所以需要先创建一个gRPC Stub的函数签名,以UserStub为例子,将会创建一个类似于gRPC UserStub的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from typing import Any


class UserStub(object):
def __init__(self, channel: Any):
pass

def get_uid_by_token(self, *args: Any, **kwargs: Any) -> None:
pass

def logout_user(self, *args: Any, **kwargs: Any) -> None:
pass

def login_user(self, *args: Any, **kwargs: Any) -> None:
pass

def create_user(self, *args: Any, **kwargs: Any) -> None:
pass

def delete_user(self, *args: Any, **kwargs: Any) -> None:
pass

其它的见:tests/grpc_abc_stub.py

然后在tests/conftest.py编写一个全局的初始化,该初始化会把gRPC的检查连接方法屏蔽以及把对应的Stub类进行替换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pytest
from grpc import _utilities
from grpc_example_common.protos.book import manager_pb2_grpc, social_pb2_grpc
from grpc_example_common.protos.user import user_pb2_grpc

from tests.grpc_abc_stub import BookManagerStub, BookSocialStub, UserStub


def result(self: Any, timeout: Any = None) -> Any:
pass


# Blocking the start check of grpc service
_utilities._ChannelReadyFuture.result = result

user_pb2_grpc.UserStub = UserStub
user_pb2_grpc.UserStub = UserStub
social_pb2_grpc.BookSocialStub = BookSocialStub
manager_pb2_grpc.BookManagerStub = BookManagerStub

这样就为测试用例初始化完成了, 但是我为每个Stub包装了一个功能,使他们永远会传递metadata变量:

1
2
3
4
class UserGrpcServiceMixin(object):
def __init__(self, channel: grpc.Channel):
self.user_stub: user_service.UserStub = user_service.UserStub(channel)
grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper)

该方法会初始化一定要放在mock之后,否则mock无效,这意味着初始化Flask.TestClient的逻辑必须在测试代码里,于是先创建一个类似于pytest.fixture的初始化Flask.TestClient函数:

1
2
3
4
5
6
7
8
9
10
11
@contextmanager
def customer_app() -> Generator[FlaskClient, None, None]:
flask_app: Flask = create_app()
# Flask provides a way to test your application by exposing the Werkzeug test Client
# and handling the context locals for you.
client: FlaskClient = flask_app.test_client()
# Establish an application context before running the tests.
ctx: AppContext = flask_app.app_context()
ctx.push()
yield client # this is where the testing happens!
ctx.pop()

为了偷懒,我把他放到了conftest文件。

如果没有包装过stub的方法,则可以不用采用该步骤

现在,所有初始化都编写完毕了,可以编写测试用例,以用户调用路由为例子(说明见注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from google.protobuf.empty_pb2 import Empty  # type: ignore
from grpc_example_common.protos.user import user_pb2 as user_message
from pytest_mock import MockFixture
from werkzeug.test import TestResponse

from tests.conftest import customer_app


class TestUser:
def test_create_user(self, mocker: MockFixture) -> None:
# 该接口会调用到UserStub.create_user,我们把他mock掉,返回的是Empty
mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.create_user").return_value = Empty()
with customer_app() as client:
resp: TestResponse = client.post(
"/api/user/create", json={"uid": "123", "user_name": "so1n", "password": "aha"}
)
assert resp.json["code"] == 0

def test_delete_user(self, mocker: MockFixture) -> None:
# 该接口会调用到UserStub.delete_user,我们把他mock掉,返回的是Empty
mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user").return_value = Empty()
with customer_app() as client:
resp: TestResponse = client.post("/api/user/delete", json={"uid": "123"})
assert resp.json["code"] == 0
# User.Stub.delete_user还有一个会抛错的方法,我们通过mock满足这个条件
mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user").side_effect = RuntimeError(
"test error"
)
with customer_app() as client:
resp = client.post("/api/user/delete", json={"uid": "123"})
assert resp.json["data"] == "test error"

def test_login_user(self, mocker: MockFixture) -> None:
# 通过mock指定具体的返回数据,返回的数据类型一定要跟Protobuf生成的代码一致
mocker.patch(
"grpc_example_common.protos.user.user_pb2_grpc.UserStub.login_user"
).return_value = user_message.LoginUserResult(uid="123", token="66666")
with customer_app() as client:
resp: TestResponse = client.post("/api/user/login", json={"uid": "123", "password": "pw"})
assert resp.json["data"] == {"uid": "123", "token": "66666"}

def test_logout(self, mocker: MockFixture) -> None:
mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.logout_user").return_value = Empty()
mocker.patch(
"grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token"
).return_value = user_message.GetUidByTokenResult(uid="123")
with customer_app() as client:
resp: TestResponse = client.post("/api/user/logout", json={"uid": "123"}, headers={"token": "666666"})
assert resp.json["code"] == 0

mocker.patch(
"grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token"
).return_value = user_message.GetUidByTokenResult(uid="1234")
with customer_app() as client:
resp = client.post("/api/user/logout", json={"uid": "123"}, headers={"token": "666666"})
assert resp.json["data"] == "Uid ERROR"

至此,已经实现了一个可以简单使用的gRPC服务,可以在电脑上起不同的进程并观察他们的调用情况,但是这只是一个开始,随着服务的扩大,服务间的维护和调优会变得十分麻烦,要想服务能够健壮的运行,我们需要继续深入。

查看评论