前言 通过前面的文章了解到了gRPC
是什么,以及清楚使用它的优缺点,现在终于可以开始实现一个gRPC
服务了。
这里演示的是一个用户与书互动的项目,用户可以通过该项目进行注册,登录,注销等操作,同时也可以上传,查看和评论对应的书籍,通常情况下我们会由一个简单的Web应用来提供这些服务,现在,我们假设这个服务非常庞大,需要把他们按照功能拆分成不同的微服务了,这些服务与Web应用通过gRPC
进行通信。
注:由于篇幅原因,不会夹杂大量的源代码,需要跳转到Github
中查看,同时对于业务逻辑也不会详细的介绍,所以可能需要一些接口开发经验才容易阅读懂。
1.初始化准备 在创建项目之前,我们需要确定我们的需求是什么,就像开发API接口一样,先了解需求,然后多方根据需求定义好接口,最后才为每个接口编写对应的代码,在这个项目中,我假定了拆分了两个服务,一个是与用户有关, 一个是与书籍有关,书籍部分又细分为书籍管理,书籍社交两部分。为此,先编写了Protobuf文件,之前在Python-gRPC实践(2)–Protocol Buffer 中说过,我们创建gRPC
对应的Protobuf文件应该放在一个公有的仓库中,这样就方便后续的Protobuf文件升级以及不同语言都能共享同一份Protobuf文件。
所以创建一个gRPC
服务的第一步就是先创建一个包含Protobuf文件的仓库,我把它命名为grpc-example-common
,具体源码可以通过grpc-example-common 获取。
这个仓库中pyproject.toml
文件的tool.poetry.dependencies
部分如下:
1 2 3 4 [tool.poetry.dependencies] python = "^3.8" grpcio = "^1.43.0" grpcio-tools = "^1.43.0"
通过这部分文件可以知道这个项目是基于Python3.8
版本的,然后用到了2个依赖分别是grpcio
以及grpcio-tools
,其中grpcio
是Python
的gRPC
实现,它是通过c语言翻译的,所以很多底层都是c实现的,如果在使用gRPC框架的过程中找不到对应的使用方法说明,那可以直接到gRPC的c项目中找到对应的函数并查看它的函数说明进而了解该函数的作用;而另一个库grpcio-tools
的作用是把proto
文件转译为Python
代码,不过单靠grpcio-tools
转译的代码很难使用,比如是这段代码:
1 2 3 4 from grpc_example_common.protos.user.user_pb2 import LoginUserResult login_user_result: LoginUserResult = LoginUserResult()
这段代码引入了由grpcio-tools
通过用户Protobuf文件生成的LoginUserResult
对象,开发者在后续想要使用这个对象的时候,IDE是没办法提示你这个对象有什么属性的,只能凭自己的记忆进行填写,或者回到对应的Protobuf文件查看该对象的定义:
buffer 1 2 3 4 5 message LoginUserResult { string uid = 1; string user_name = 2; string token = 3; }
发现它有uid
,user_name
,token
三个属性,然后才会在代码填写LoginUserResult
对象的属性进行调用:
1 2 3 4 5 6 7 8 9 10 from grpc_example_common.protos.user.user_pb2 import LoginUserResult login_user_result: LoginUserResult = LoginUserResult( uid="123" , user_name="so1n" , token="aaa" ) print(login_user_result.uid)
这时即使填错了,比如uid
写为uid1
IDE也不会提示有错误,我们需要等到运行时报错才知道是填错了。
这样一个场景是会让开发者非常难受的,明明都定义了一个Protobuf文件,文件中已经写了这个消息有什么属性了,结果生成对应的类却无法让IDE了解它有什么属性(跳进去源码也无法知道),这时就需要通过mypy-protobuf 来解决这一个问题。mypy-protobuf
会生成的一份独立的.pyi
文件,这样一来IDE就可以帮忙提示这个对象有什么属性了,如图:IDE提醒 此外,通过.pyi
文件可以使mypy
等工具校验我们的代码类型是否正确,这样在运行前就能知道代码是否有问题。
mypy-protobuf
的使用方法十分的简单,它以grpcio-tools
的一个插件来运行,具体的使用方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 target_p = "xxx" sourct_p = "xxx" python -m grpc_tools.protoc \ --python_out=./$target_p \ --grpc_python_out=./$target_p \ -I. \ $source_p /user/*.proto --mypy_grpc_out=./$target_p \ --mypy_out=./$target_p \
只要运行了这段命令,grpc_tools
就能在对应的路径下生成Protobuf对应的代码和对应的pyi
文件,不过当前的grpcio-tools
默认生成的代码所在的目录名是protos
,它认为这个目录是在项目对应的根目录下生成的,如果我们指定在某个子目录下生产对应的代码,那么在运行程序时会直接报错,因为生成的代码文件中有一个大概长成这样的语句:
这意味着它永远都是从项目的根目录开始引入的protos
包,但我们根目录却没有这个包,所以就会报错,这时就需要手动把生成的语句替换为:
这样就可以完美运行了,但是每个文件手动改一下会非常的麻烦,因为每次生成代码后都要手动更改代码,同时由于项目存在多个Protobuf文件,每个文件都需要执行一次命令才能生成对应的代码。对于一个开发者来说,最讨厌的就是一直执行重复的工作,这种工作是非常烦心的, 所以需要编写了一个脚本来自动的把所有Protobuf文件转为Python
代码(也就是项目中的gen_rpc.sh
文件),该脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 export VENV_PREFIX="" if [ -d 'venv' ] ; then export VENV_PREFIX="venv/bin/" fi if [ -d '.venv' ] ; then export VENV_PREFIX=".venv/bin/" fi echo 'use venv path:' ${VENV_PREFIX} target_p='grpc_example_common' source_p='protos' service_list=("book" "user" ) rm -r "${target_p:?} /${source_p:?} " * mkdir -p "${target_p:?} /${source_p:?} " for service in "${service_list[@]} " do mkdir -p "${target_p:?} /${source_p:?} /${service:?} " echo "from proto file:" $source_p /"$service " /*.proto "gen proto py file to" $target_p /$source_p ${VENV_PREFIX} python -m grpc_tools.protoc \ --mypy_grpc_out=./$target_p \ --mypy_out=./$target_p \ --python_out=./$target_p \ --grpc_python_out=./$target_p \ -I. \ $source_p /"$service " /*.proto touch $target_p /$source_p /"$service " /__init__.py sed -i "s/from protos.$service import/from . import/" $target_p /$source_p /$service /*.pydone
这样一来,我们通过Protobuf文件生成Python
代码的操作就非常省心了,不管Protobuf文件有何改动,只要通过调用命令后就能在grpc_example_common.protos
目录下看到已经生成的最新的Python
代码,目前grpc_example_common
的项目结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ├── grpc_example_common │ ├── helper │ ├── __init__.py │ ├── interceptor │ └── protos ├── protos │ ├── book │ └── user ├──.flake8 ├──.pre-commit-config.yaml ├── gen_rpc.sh ├── mypy.ini ├── pyproject.toml ├── README.md ├── requirements-dev.txt ├── requirements.txt └── setup.py
通过项目结构可以看出还有其它的东西,这是我为了方便,我还在这个项目中添加一些Python
与gRPC
相关的调用封装,把它当做一个Python
的自定义包。
需要注意的是,每修改一次Protobuf文件应该视为一次版本发布,当生成完Protobuf文件的对应代码后,我们需要提交代码并打上对应的tag,这样其它项目才能引用到对应的版本代码。
对于格式化工具以及poetry包管理工具不了解的可以通过文章保障Python项目质量的工具 了解。
grpc_example_common
目录下还有其它常用的封装,将会在后续章节介绍。
2.编写gRPC服务项目 目前这个演示的项目有两个子gRPC项目,分别为grpc-example-book-grpc-service 和grpc-example-user-grpc-service ,他们的结构很像,所以这一节以grpc-example-user-grpc-service 来阐述如何创建一个gRPC服务。
该项目的代码结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 ├── tests │ ├── __init__.py │ └── test_user.py ├── user_grpc_service │ ├── dal │ ├── handler │ ├── helper │ └── __init__.py ├── app.py ├── mypy.ini ├── pyproject.toml └── user.sql
首先,该项目会通过如下配置引入一些依赖:
1 2 3 4 5 6 [tool.poetry.dependencies] python = "^3.8" DBUtils = "^3.0.0" PyMySQL = "^1.0.2" cryptography = "^36.0.1" grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git" , tag="v0.1.2" }
其中grpc_example_common
项目就是包括我们上面通过Protobuf生成的文件生产的代码,以及一些自定义的封装,通过引入依赖后,可以很方便的引用Protobuf文件生成的代码。
安装依赖后,就可以在项目中编写对应的gRPC服务了, 在这个项目里有一个比较简单的分层,所有的gRPC服务接口处理的函数都在放在user_grpc_service.handler
目录中,而与数据库交互的则放在user_grpc_service.dal
中。
编写服务的第一步,就是在user_grpc_service.handler
编写对应的代码,先创建一个名为user.py
的文件,该文件的代码值负责对User服务的调用,由于对于User服务只有一个子服务,里面只需要创建一个名为UserServicer
的类,这个类似继承于Protobuf生成的user_pb2_grpc.UserServicer
类,如下:
1 2 3 4 5 from grpc_example_common.protos.user import user_pb2_grpc as user_serviceclass UserServicer (user_service.UserServicer ): pass
这时候IDE会在UserServicer
上显示波浪线,如果鼠标移到波浪线位置上,IDE会提示类 User 必须实现所有 abstract 方法
,于是点击实现abstract方法
后,就会自动生成类似于下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from google.protobuf.empty_pb2 import Empty from grpc_example_common.protos.user import user_pb2 as user_messagefrom grpc_example_common.protos.user import user_pb2_grpc as user_serviceclass UserServicer (user_service.UserServicer ): def logout_user (self, request: user_message.LogoutUserRequest, context: grpc.ServicerContext ) -> Empty: pass def login_user (self, request: user_message.LoginUserRequest, context: grpc.ServicerContext ) -> user_message.LoginUserResult: pass def create_user (self, request: user_message.CreateUserRequest, context: grpc.ServicerContext ) -> Empty: pass def delete_user (self, request: user_message.DeleteUserRequest, context: grpc.ServicerContext ) -> Empty: pass def check_user_login (self, request: user_message.LogoutUserRequest, context: grpc.ServicerContext ) -> user_message.CheckLoginResult: pass
这段代码就是protos/user/user.proto 对应的Python
代码表达,当客户端调用UserServicer.logout_user
方法时,服务端就会自动转到该方法执行对应的逻辑,并返回结果给客户端,所以对于开发者来说只要专心完成好这几个接口的实现即可。开发者编写此处的业务逻辑代码与平时编写的API代码基本没什么差别,这里不多做阐述,具体的业务逻辑可见user_grpc_service/handler/user.py
不过需要注意的是从request
中得到的数据对象并不是Python
中常见的对象,而是gRPC封装的且类似于Python
常见的对象,如果直接用于pymysql
的类似代码:
1 2 with conn.cursor() as cursor: cursor.execute(sql, param)
那么execute可能会转码失败,导致拼接不了正确的SQL,这时候可以把request
中得到的对象转为Python
中场见的对象,比如gRPC
的时间类型Timestamp
与Python
时间类型datetime
转换如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import datetimefrom dataclasses import MISSINGfrom typing import Any, Optionalfrom google.protobuf.timestamp_pb2 import Timestamp def timestamp_to_datetime (t: Timestamp, default: Any = MISSING ) -> datetime.datetime: """replace proto.timestamp to python datetime.datetime""" if t.seconds == 0 and t.nanos == 0 and default != MISSING: return default return t.ToDatetime()def datetime_to_timestamp (d: Optional[datetime.datetime] ) -> Timestamp: """replace python datetime.datetime to proto.timestamp""" t: Timestamp = Timestamp() if d: t.FromDatetime(d) return t
通过封装好的timestamp_to_datetime
和datetime_to_timestamp
可以方便的在业务逻辑中对gRPC
和Python
对象进行转换,更多类型转换见grpc_example_common/helper/field.py ,不过这种转换的实现是非常简单的,性能也不是很好,如果为了追求性能,可以尝试使用pure-protobuf ,它会带来一点复杂性,但是使用感受会非常好,性能也非常棒。
业务代码编写完后,需要绑定到对应的Server
上面才能正常的提供服务,于是我们需要像创建Flask Server
一样,先创建一个服务,然后把路由注册进去,对于gRPC
的实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import loggingimport osfrom concurrent import futuresfrom typing import List, Optionalimport grpcfrom grpc_example_common.interceptor.server_interceptor.base import BaseInterceptorfrom grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptorfrom user_grpc_service.handler.user import UserService, user_service logging.basicConfig( format ="[%(asctime)s %(levelname)s] %(message)s" , datefmt="%y-%m-%d %H:%M:%S" , level=logging.DEBUG, ) logger: logging.Logger = logging.getLogger()def main (host: str = "127.0.0.1" , port: str = "9000" , ssl_port: Optional[str ] = None ) -> None : interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()] server: grpc.server = grpc.server( futures.ThreadPoolExecutor(max_workers=10 ), interceptors=interceptor_list, ) user_service.add_UserServicer_to_server(UserService(), server) if ssl_port: port = ssl_port with open (os.path.join(os.path.split(__file__)[0 ], "server.key" )) as f: private_key = f.read().encode() with open (os.path.join(os.path.split(__file__)[0 ], "server.crt" )) as f: certificate_chain = f.read().encode() server_creds = grpc.ssl_server_credentials( ( ( private_key, certificate_chain, ), ) ) server.add_secure_port(f"{host} :{port} " , server_creds) else : server.add_insecure_port(f"{host} :{port} " ) server.start() try : for generic_handler in server._state.generic_handlers: logger.info( f"add service name:{generic_handler.service_name()} cnt:{len (generic_handler._method_handlers)} " ) logger.info(f"server run in {host} :{port} " ) server.wait_for_termination() except KeyboardInterrupt: server.stop(0 )if __name__ == "__main__" : main()
可以看到这段代码非常简单, 但是他肩负了很多请求和连接的健康维护,会在后续的章节中详细介绍。
3.测试编写的gRPC服务 代码编写完成后就应该发起请求,看看这个服务是否能正常运行,但是gRPC
服务不像HTTP
服务一样可以在浏览器等地方输入一个URL就能发起一个请求,所以为了能验证我们的服务能否正常的运行,我们应该编写一个测试用例。
在官方文档gRPC Testing 中介绍了gRPC
的测试用例编写方法,但是这个只覆盖到了业务代码,无法覆盖到拦截器,参数调优等逻辑,而我目前使用到了一个名为CustomerTopInterceptor
的拦截器,它在发现业务代码有异常的时候会把异常通过meta_data
传给客户端,然后客户端进行解析并抛出对应的异常(这种实现可能不是最优雅的,但是符合需求),如果采用了官方的gRPC Testing
,那在测试用例中山无法捕获到对应的异常的,所以只能采取其它的测试方法来编写一个覆盖范围更广的测试用例–pytest-grpc 。
首先是安装好pytest-grpc ,然后按照标准的测试用例编写习惯,在项目根目录创建一个名为tests
的目录,(当然,我也在pyproject.toml
指定了pytest
的执行目录为tests
),然后在里面编写每个子服务的测试代码,一般来说一个子服务对应一个Python
文件,接着在这个文件的最前面编写服务pytest-grpc 要求的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from typing import Callable, Listimport grpcfrom grpc_example_common.interceptor.client_interceptor.customer_top import ( CustomerTopInterceptor as ClientCustomerTopInterceptor, )from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptorfrom grpc_example_common.protos.user import user_pb2, user_pb2_grpcfrom user_grpc_service.handler.user import UserServicefrom user_grpc_service.helper.conn_proxy import SteadyDBConnection, g_db_pool@pytest.fixture(scope="module" ) def grpc_add_to_server () -> Callable: return user_pb2_grpc.add_UserServicer_to_server@pytest.fixture(scope="module" ) def grpc_servicer () -> UserService: return UserService()@pytest.fixture(scope="module" ) def grpc_interceptors () -> List[grpc.ServerInterceptor]: return [CustomerTopInterceptor()]@pytest.fixture(scope="module" ) def grpc_stub (grpc_channel: grpc.Channel ) -> user_pb2_grpc.UserStub: channel: grpc.Channel = grpc.intercept_channel(grpc_channel, ClientCustomerTopInterceptor()) return user_pb2_grpc.UserStub(channel)
创建完成后我们就可以继续在该文件编写对应的测试用例代码了,这样在运行的时候都会自动加载上面代码,然后我们可以在每个测试用例都使用上面代码创建的客户端grpc_stub
来发起请求进行测试。以创建用户和删除用户两个接口为例子,创建用户接口调用后会在数据库生成一条对应的数据,而删除用户接口会从数据库删除一条对应的数据,如果数据不存在于数据库,则会抛出RuntimeError
异常(具体代码逻辑可见user_grpc_service/handler/user.py ),他们的测试用例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 from contextlib import contextmanagerfrom typing import Callable, Generator, List@contextmanager def mock_user ( uid: str = "666666" , user_name: str = "so1n" , password: str = "123456" ) -> Generator[None , None , None ]: """通过contextmanager可以在对应的代码逻辑创建一个用户,并在结束时自动清除该用户信息""" conn: SteadyDBConnection = g_db_pool.connection() try : with conn.cursor() as cursor: cursor.execute( "INSERT INTO user (uid, user_name, password) VALUES (%s, %s, %s)" , (uid, user_name, password), ) conn.commit() yield finally : with conn.cursor() as cursor: cursor.execute("DELETE FROM user WHERE uid=%s" , (uid,)) conn.commit()class TestUser : def test_create_user (self, grpc_stub: user_pb2_grpc.UserStub ) -> None : """创建用户调用的测试用例""" try : request: user_pb2.CreateUserRequest = user_pb2.CreateUserRequest( uid="666666" , user_name="so1n" , password="123456" ) grpc_stub.create_user(request, metadata=[]) finally : conn: SteadyDBConnection = g_db_pool.connection() conn.begin() with conn.cursor() as cursor: ret: int = cursor.execute("DELETE FROM user WHERE uid=%s" , ("666666" ,)) conn.commit() assert ret == 1 def test_delete_user (self, grpc_stub: user_pb2_grpc.UserStub ) -> None : """删除用户调用的测试用例""" uid: str = "666666" request: user_pb2.DeleteUserRequest = user_pb2.DeleteUserRequest(uid=uid) with pytest.raises(RuntimeError): grpc_stub.delete_user(request, metadata=[]) with mock_user(uid=uid): grpc_stub.delete_user(request, metadata=[])
运行测试用例可以发现测试通过了,接下来就可以编写我们的API服务,在API服务中调用我们的gRPC服务。
4.编写API服务 gRPC
服务搭建完毕后,终于可以来编写API服务了,有了API服务后,才能把功能提供给了用户,API服务的项目结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ├── app_service │ ├── social_book_route.py │ ├── manager_book_route.py │ ├── utils.py │ ├── route.py │ ├── user_route.py │ └── __init__.py ├── grpc_service │ ├── __init__.py │ ├── user_service.py │ └── book_service.py ├── tests │ ├── test_route │ ├── __init__.py │ └── conftest.py ├── app.py ├── gunicorn.conf.py ├── pyproject.toml ├── README.md └── mypy.ini
API服务与gRPC
服务一样,通过pyproject.toml
的配置:
1 2 3 4 [tool.poetry.dependencies] python = "^3.8" grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git" , tag="v0.1.4" }Flask = "^2.0.3"
引用了grpc_example_common
的库, 然后在grpc_service
中用到了该库,还是以用户服务为例子,用户服务的代码位于项目的grpc_service.user_service.py
中,这个代码首先是创建一个Mixin
的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class UserGrpcServiceMixin (object ): def __init__ (self, channel: grpc.Channel ): self.user_stub: user_service.UserStub = user_service.UserStub(channel) grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper) def create_user (self, *, uid: str , user_name: str , password: str ) -> None : self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password)) def delete_user (self, *, uid: str ) -> None : self.user_stub.delete_user(user_message.DeleteUserRequest(uid=uid)) def login_user (self, *, uid: str , password: str ) -> user_message.LoginUserResult: return self.user_stub.login_user(user_message.LoginUserRequest(uid=uid, password=password)) def logout_user (self, *, uid: str , token: str ) -> None : self.user_stub.logout_user(user_message.LogoutUserRequest(uid=uid, token=token)) def get_uid_by_token (self, *, token: str ) -> str: result: user_message.GetUidByTokenResult = self.user_stub.get_uid_by_token( user_message.GetUidByTokenRequest(token=token) ) return result.uid
这个类就是对user_service.UserStub
的简单封装,可以看到方法名和参数与Protobuf保持一致,它只是接受一个负责通信的channel,然后传入到user_stub
中,方便后续的方法对user_stub
调用,同时这个类还负责一些数据的转换,如上面提到的Protobuf
的Timestamps
对象转为Python
的datetime
对象。
接着创建一个负责子服务管理的UserGrpcSerevice
类,这个类负责建立通信和维护通信段稳定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class UserGrpcService (UserGrpcServiceMixin ): def __init__ (self, host: str , port: int ) -> None : self.channel: grpc.Channel = grpc.intercept_channel( grpc.insecure_channel(f"{host} :{port} " ), CustomerTopInterceptor() ) UserGrpcServiceMixin.__init__(self, self.channel) def channel_ready_future (self, timeout: int = 10 ) -> None : target: str = ( f"{self.__class__.__name__} " f" {self.channel._channel._connectivity_state.channel.target().decode()} " ) try : grpc.channel_ready_future(self.channel).result(timeout=timeout) except grpc.FutureTimeoutError: logger.exception(f"channel:{target} connect timeout" ) raise RuntimeError(f"channel:{target} connect timeout" ) else : logger.info(f"channel:{target} connect success" )
创建完毕后对于gRPC客户端调用服务端的逻辑已经封装完成了,接下来就是在路由函数中进行使用了,一般情况下都是使用单例的模式创建一个UserGrpcService
的实例,但是我不太喜欢这样做,于是创建了一个中间件,然后通过flask.g
传递创建app时初始化的gRPC
服务示例,对应的中间件处理代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from typing import Any, Unionfrom flask import Blueprint, Flask, Responsefrom flask import g as flask_gfrom flask import jsonify, requestfrom grpc_service.book_service import BookGrpcServicefrom grpc_service.user_service import UserGrpcService APP_TYPE = Union[Blueprint, Flask]class CustomerGType (object ): """基于flasg.g的封装,这样就可以无忧的使用IDE的提示和重构功能了""" book_grpc_service: BookGrpcService user_grpc_service: UserGrpcService def __getattr__ (self, key: str ) -> Any: return getattr (flask_g, key) def __setattr__ (self, key: str , value: Any ) -> None : setattr (flask_g, key, value) g: CustomerGType = CustomerGType()class ContextMiddleware (object ): """基于flask的before_request和after_request钩子创建的一个中间件类""" def __init__ ( self, *, app: APP_TYPE, book_grpc_service: BookGrpcService, user_grpc_service: UserGrpcService ) -> None : self._app = app self._app.before_request(self._before_requests) self._app.after_request(self._after_requests) self._book_grpc_service: BookGrpcService = book_grpc_service self._user_grpc_service: UserGrpcService = user_grpc_service def _before_requests (self ) -> None : g.book_grpc_service = self._book_grpc_service g.user_grpc_service = self._user_grpc_service return def _after_requests (self, response: Response ) -> Response: return response
中间件创建完成后,就可以在create_app
工厂函数中创建对应的gRPC
服务,然后赋值到对应的中间件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from flask.app import Flaskfrom app_service.route import manager_book_bp, social_book_bp, user_bpfrom app_service.utils import ContextMiddleware, api_exceptionfrom grpc_service.book_service import BookGrpcServicefrom grpc_service.user_service import UserGrpcServicedef create_app () -> Flask: app: Flask = Flask(__name__) app.register_blueprint(manager_book_bp) app.register_blueprint(social_book_bp) app.register_blueprint(user_bp) book_grpc_service: BookGrpcService = BookGrpcService("0.0.0.0" , 9000 ) book_grpc_service.channel_ready_future(timeout=3 ) user_grpc_service: UserGrpcService = UserGrpcService("0.0.0.0" , 9001 ) user_grpc_service.channel_ready_future(timeout=3 ) ContextMiddleware(app=app, book_grpc_service=book_grpc_service, user_grpc_service=user_grpc_service) app.errorhandler(Exception)(api_exception) return appif __name__ == "__main__" : create_app().run("localhost" , port=8000 )
最后,就可以在路由函数中使用对应的gRPC
服务了,还是以创建用户和删除用户为例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from flask import Response, requestfrom grpc_example_common.protos.user import user_pb2 as user_messagefrom app_service.utils import g, get_uid_by_token, make_responsedef create_user () -> Response: request_dict: dict = request.json() g.user_grpc_service.create_user( uid=request_dict["uid" ], user_name=request_dict["user_name" ], password=request_dict["password" ] ) return make_response()def delete_user () -> Response: request_dict: dict = request.json() g.user_grpc_service.delete_user(uid=request_dict["uid" ]) return make_response()
这两个接口都是在收到请求后,再调用gRPC
服务对应的方法来传递请求,其它服务调用的代码与创建用户和删除用户的例子相同,具体可以访问app_service 了解
需要注意的是,通常我们不会在生产环境直接运行Flask
,而是采用gunicorn
+gevnet
来运行我们的API服务,从而增强服务的稳定性和性能,但是gevent
是修改Python
代码来达到全局代码都不阻塞的,而gRPC
本身的调用是包含C代码,gevent
无法修改到gRPC
调用到的C代码,所以gRPC
提供一个名为grpc.experimental.gevent.init_gevent
的方法来解决这个问题,如下代码:
1 2 3 4 5 6 7 8 9 10 11 import sockettry : import geventexcept ImportError: pass else : if socket.socket is gevent.socket.socket: import grpc.experimental.gevent grpc.experimental.gevent.init_gevent()
这段代码在初始化时可以通过判断是否启用gevnet
来启用grpc.experimental.gevent.init_gevent
,通常建议放在gunicorn的配置文件里,具体见gunicorn.conf.py
至此,一个API服务就搭建完毕,可以直接运行后在浏览器进行测试。
5.测试编写的API服务 截止到目前,对于包含gRPC的API服务接口测试没有一个比较好的方法,因为单例测试是不考虑别的服务的,意味着需要对gRPC调用段响应进行Mock,然而用于调用段Stub
类只有属性而没有方法,这样会导致mock不成功,所以需要先创建一个gRPC Stub的函数签名,以UserStub
为例子,将会创建一个类似于gRPC UserStub的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from typing import Anyclass UserStub (object ): def __init__ (self, channel: Any ): pass def get_uid_by_token (self, *args: Any, **kwargs: Any ) -> None : pass def logout_user (self, *args: Any, **kwargs: Any ) -> None : pass def login_user (self, *args: Any, **kwargs: Any ) -> None : pass def create_user (self, *args: Any, **kwargs: Any ) -> None : pass def delete_user (self, *args: Any, **kwargs: Any ) -> None : pass
其它的见:tests/grpc_abc_stub.py
然后在tests/conftest.py 编写一个全局的初始化,该初始化会把gRPC的检查连接方法屏蔽以及把对应的Stub类进行替换:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import pytestfrom grpc import _utilitiesfrom grpc_example_common.protos.book import manager_pb2_grpc, social_pb2_grpcfrom grpc_example_common.protos.user import user_pb2_grpcfrom tests.grpc_abc_stub import BookManagerStub, BookSocialStub, UserStubdef result (self: Any, timeout: Any = None ) -> Any: pass _utilities._ChannelReadyFuture.result = result user_pb2_grpc.UserStub = UserStub user_pb2_grpc.UserStub = UserStub social_pb2_grpc.BookSocialStub = BookSocialStub manager_pb2_grpc.BookManagerStub = BookManagerStub
这样就为测试用例初始化完成了, 但是我为每个Stub包装了一个功能,使他们永远会传递metadata
变量:
1 2 3 4 class UserGrpcServiceMixin (object ): def __init__ (self, channel: grpc.Channel ): self.user_stub: user_service.UserStub = user_service.UserStub(channel) grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper)
该方法会初始化一定要放在mock之后,否则mock无效,这意味着初始化Flask.TestClient
的逻辑必须在测试代码里,于是先创建一个类似于pytest.fixture
的初始化Flask.TestClient
函数:
1 2 3 4 5 6 7 8 9 10 11 @contextmanager def customer_app () -> Generator[FlaskClient, None , None ]: flask_app: Flask = create_app() client: FlaskClient = flask_app.test_client() ctx: AppContext = flask_app.app_context() ctx.push() yield client ctx.pop()
为了偷懒,我把他放到了conftest
文件。
如果没有包装过stub的方法,则可以不用采用该步骤
现在,所有初始化都编写完毕了,可以编写测试用例,以用户调用路由为例子(说明见注释):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 from google.protobuf.empty_pb2 import Empty from grpc_example_common.protos.user import user_pb2 as user_messagefrom pytest_mock import MockFixturefrom werkzeug.test import TestResponsefrom tests.conftest import customer_appclass TestUser : def test_create_user (self, mocker: MockFixture ) -> None : mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.create_user" ).return_value = Empty() with customer_app() as client: resp: TestResponse = client.post( "/api/user/create" , json={"uid" : "123" , "user_name" : "so1n" , "password" : "aha" } ) assert resp.json["code" ] == 0 def test_delete_user (self, mocker: MockFixture ) -> None : mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user" ).return_value = Empty() with customer_app() as client: resp: TestResponse = client.post("/api/user/delete" , json={"uid" : "123" }) assert resp.json["code" ] == 0 mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user" ).side_effect = RuntimeError( "test error" ) with customer_app() as client: resp = client.post("/api/user/delete" , json={"uid" : "123" }) assert resp.json["data" ] == "test error" def test_login_user (self, mocker: MockFixture ) -> None : mocker.patch( "grpc_example_common.protos.user.user_pb2_grpc.UserStub.login_user" ).return_value = user_message.LoginUserResult(uid="123" , token="66666" ) with customer_app() as client: resp: TestResponse = client.post("/api/user/login" , json={"uid" : "123" , "password" : "pw" }) assert resp.json["data" ] == {"uid" : "123" , "token" : "66666" } def test_logout (self, mocker: MockFixture ) -> None : mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.logout_user" ).return_value = Empty() mocker.patch( "grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token" ).return_value = user_message.GetUidByTokenResult(uid="123" ) with customer_app() as client: resp: TestResponse = client.post("/api/user/logout" , json={"uid" : "123" }, headers={"token" : "666666" }) assert resp.json["code" ] == 0 mocker.patch( "grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token" ).return_value = user_message.GetUidByTokenResult(uid="1234" ) with customer_app() as client: resp = client.post("/api/user/logout" , json={"uid" : "123" }, headers={"token" : "666666" }) assert resp.json["data" ] == "Uid ERROR"
至此,已经实现了一个可以简单使用的gRPC服务,可以在电脑上起不同的进程并观察他们的调用情况,但是这只是一个开始,随着服务的扩大,服务间的维护和调优会变得十分麻烦,要想服务能够健壮的运行,我们需要继续深入。