前言
Python-gRPC实践(3)–使用Python实现gRPC服务
讲述了如何编写和使用gRPC服务,而本文主要讲述一些业务之外的gRPC用法。
1.优雅的重启服务
线上运行的服务永远都不会一尘不变的,特别是对于微服务,服务的更新频率越来越快,需要重启的次数也就变多了,而每次重启都会导致服务正在处理的请求被强行关闭,所以我们需要优雅的重启服务,确保在滚动更新服务的时候不会影响到客户端,好在gRPC自带了一个类似的功能,代码例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from signal import signal, SIGTERM
def serve(): options = [("grpc.so_reuseport", 1)] server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) ... server.add_insecure_port("0.0.0.0:9000") server.start()
def sig_handle(*args): event = server.stop(30) event.wait() signal(SIGTERM, sig_handle) server.wait_for_termination()
|
这个代码假设gRPC服务以端口复用模式运行(默认启用), 代码通过加上SIGTERM
信号的对应回调后,我们的更新服务的逻辑就可以进行一些调整。首先是运行一个全新的gRPC服务在9000端口,这时候请求会分流到新和旧的gRPC服务,然后我们主动向旧的服务进程发送SIGTERM
信号,这时旧服务就能开始拒绝新的请求,请求只会流向新的gRPC服务,之后我们只需要等服务自动关闭即可。
不过光这种方法还是不够,还需要注册中心等,具体实现逻辑可以通过RPC框架编写实践–服务的优雅的重启了解。
代码中的options是用于配置gRPC服务的一些功能,可以从gRPC c源码中获悉grpc_types.h,也可以通过代码生成的页面获取比较容易看到的说明文档。此外还有一些配置是需要传入一个Dict对象,可以参考service_config.proto
更多优雅的重启说明见gRPC Python Server Wait API
2.重试机制
gRPC服务是通过网络来传输请求的,所以可能会因为网络波动等原因造成请求失败,这时候就需要重试机制来使请求能成功通信了,但是引入重试机制后可能会带来一些负面的问题:
- 比如服务端已经扛不住了,而客户端还一直重试,造成持续的增加服务端压力
- 接口不是幂等,某个请求实际上已经处理完了,但是由于网络问题导致客户端认为服务端返回了错误,从而客户端进行了重试,最后导致服务端又再次处理请求,导致对应的数据发生了改变。
于是,gRPC抽象出一套重试机制,使用户可以灵活为自己想要的方法应用对应的重试机制,这个重试机制包括的最大重试次数,指数退避,可重试状态码等等,比如下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| class BookGrpcService(BookSocialGrpcServiceMixin, BookManagerGrpcServiceMixin): def __init__(self, host: str, port: int) -> None: service_config_json = json.dumps({ "methodConfig": [{ "name": [ { "service": "book_manager.BookManager", "method": "create_book" }, { "service": "book_manager.BookManager", "method": "delete_book" }, ], "retryPolicy": { "maxAttempts": 5, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"], }, }] }) options = [] options.append(("grpc.enable_retries", 1)) options.append(("grpc.service_config", service_config_json))
self.channel: grpc.Channel = grpc.intercept_channel( grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor(), options=options ) BookSocialGrpcServiceMixin.__init__(self, self.channel) BookManagerGrpcServiceMixin.__init__(self, self.channel)
|
该代码复制于grpc-example-api-backend-service,并进行一点拓展,可以看到在调用grpc.intercept_channel
进行初始化时,传入了一个options参数,这样在创建channel时会自动应用到对应的配置。
这个配置中,grpc.enable_retries
为1代表启用了重试,而grpc.service_config
接收到的是一个服务配置方法,具体可以访问service_config.proto了解,不过需要注意的是,在传入grpc.service_config
时,传的是一个Dict的对应json序列化字符串。
接下来就在grpc.service_config
的methodConfig
中,我们传入了一个数组,这个数组里面只有一个Dict,这个Dict通过name
指定了只有book_manager.BookManager
的create_book
方法和delete_book
方法会应用到重试的规则,如果这个配置为:
1 2 3 4 5 6 7 8 9 10 11 12
| { "methodConfig": [{ "name": [{}], "retryPolicy": { "maxAttempts": 5, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"], }, }] }
|
则会应用到所有的方法中。
在这个配置中,name
负责指定应用范围,而retryPolicy
负责指定应用规则,方便用户调整重试逻辑,他们的含义如下:
- maxAttempts: 最大的重试次数,必须大于,如果大于5则他的值为5
- Backoff系列: 指数退避参数,第一次重试是在random(0, initalBackoff)秒后进行的,而后续的重试间隔计算为min(initial_backoff*backoff_multiplier**(n-1), max_backoff)).
- retryableStatusCodes: 指定什么样的响应状态码才需要重试,具体参照gRPC状态码文档。
开发者只要合理的进行配置,那么就能简单的应用重试机制,但是光看这点配置还是没办法解决重试导致机器过载的问题,所以gRPC还包含另外一组参数重试限流策略–retryThrottling来解决这个问题,这个配置的含义是当客户端的失败和成功超过某个阈值时,gRPC会通过金庸重试策略来防止由于重试导致服务器过载,使用方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class BookGrpcService(BookSocialGrpcServiceMixin, BookManagerGrpcServiceMixin): def __init__(self, host: str, port: int) -> None: service_config_json = json.dumps({ "methodConfig": [{ "name": [ { "service": "book_manager.BookManager", "method": "create_book" }, { "service": "book_manager.BookManager", "method": "delete_book" }, ], "retryPolicy": { "maxAttempts": 5, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"], }, }], "retryThrottling": { "maxTokens": 10, "tokenRatio": 0.1 } }) options = [] options.append(("grpc.enable_retries", 1)) options.append(("grpc.service_config", service_config_json))
self.channel: grpc.Channel = grpc.intercept_channel( grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor(), options=options ) BookSocialGrpcServiceMixin.__init__(self, self.channel) BookManagerGrpcServiceMixin.__init__(self, self.channel)
|
在这段代码中,通过引入与methodConfig
同级的retryThrottling
配置,之后gRPC客户端会维护一个tokenCount的变量,它的初始值为maxToken,值得范围在0-maxToken波动,之后每一次请求成功时,tokenCount都会递增tokenRatio
的值,每次请求失败时,tokenCount都会递减1,之后在重试之前如果发现tokaneCount小于或等于maxToken/2时,gRPC客户端不会进行重试且不会进行对冲。需要注意的是tokenRatio
的设定范围只能在0-1之间,且只支持小数点后3位,如0.5466将为视为0.546,而MmaxTOkens的取值范围为0-1000的整数值。
目前Python gRPC不支持对冲,更多gRPC重试内容和设计见A6-client-retries
3.调用时参数
3.1.超时
超时机制, 是一个简单又方便的控制网络请求异常的一种方法, 它可以保证服务稳定(本质是快速失败), 良好的超时控制策略可以尽快的释放高延迟的请求,避免请求堆积。对于一般的HTTP请求,只需要在客户端添加一个超时参数,然后客户端检查在超时时间范围内还没收到对应的网络请求时,就会单方面关闭请求,这种处理方法在大多数情况下是没问题的,但在遇到阻塞性调用等情况时,客户端单方面关闭了请求,但服务端还在继续处理请求,并做出响应,不过客户端已经关闭了连接了,它无法收到该响应,这样就造成了服务端都性能浪费了。要解决这个问题,就需要引入一个超时传递的机制,使服务端能获取到本次请求的指定超时时间,并做出响应的操作。
好在gRPC已经自带了超时和超时传递的功能了,只需要在调用时添加timeout参数即可,如下:
1 2 3 4 5 6
| class UserGrpcServiceMixin(object): def __init__(self, channel: grpc.Channel): self.user_stub: user_service.UserStub = user_service.UserStub(channel)
def create_user(self, *, uid: str, user_name: str, password: str) -> None: self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password), timeout=10)
|
这样一来,客户端在调用的时候,就会进行倒计时,如果10秒内任然没有收到响应,那么它就会抛出超时异常,而服务端可以通过context.time_remaining()
获取到还有多少剩余时间,并在主要逻辑代码使用,如下:
1 2 3 4 5 6
| class UserService(user_service.UserServicer): @conn_proxy() def create_user(self, request: user_message.CreateUserRequest, context: grpc.ServicerContext) -> Empty: with TimeoutContext(context.time_remaining()): user_dal.create_user(uid=request.uid, user_name=request.user_name, password=request.password) return Empty()
|
该代码引入一个TimeoutContext对象,这个对象会托管该作用域,如果该作用域的运行时长超多指定的时间,则会直接抛出错误,具体见RPC框架编写实践–超时与超时传递
3.2.等待服务就绪
一般的gRPC客户端在请求服务端都时候,如果发现服务端无法及时传输数据,比如channel处于TRANSIENT_FAILURE
或SHUTDOWN
状态时,客户端应该马上响应失败,这种方法被称为快速失败,但是服务端还有其它的状态如CONNECTING
、READY
或IDLE
,这时候channel处于准备状态,还无法提供服务,但是在稍后将可以提供服务,比如客户端服务端同时启动,但因为短暂的网络故障导致服务端不可用等场景。这时候就可以使用wait-for-ready
功能,使用wait-for-ready
的调用将自动等待,直到服务器准备好接受请求时再发送,如下例子:
1 2 3 4 5 6
| class UserGrpcServiceMixin(object): def __init__(self, channel: grpc.Channel): self.user_stub: user_service.UserStub = user_service.UserStub(channel)
def create_user(self, *, uid: str, user_name: str, password: str) -> None: self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password), wait_for_ready=True)
|
这样一来在调用create_user
时,如果服务端还处于准备阶段,那么他会一直等待,直到服务端处于运行状态,需要注意的是,等待时间不会超过timeout指定的时间。
4.数据类型的转换
在演示的代码grpc_example_common.helper.field中,我编写了几个参数,用于做Protobuf文件的数据类型转换,这些实现是非常简陋的,有些类型并没有考虑到,不过官方提供好了数据类型的转换,这些功能都位于google.protobuf.json_format之中,大多数我们只要用json_format.ParseDict
或者是json_format.MessageToDict
即可,不过有些时候我们需要进行一些定制化,但是json_format并没有开放出对应的接口,我们只能使用一些奇怪的方法来定制化,比如json_format.ParseDict
,我们只要重置json_format._ConvertScalarFieldValue
方法即可,用法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| from decimal import Decimal from google.protobuf import json_format from grpc_example_common.protos.book.social_pb2 import GetBookLikesResult
_ConvertScalarFieldValue = json_format._ConvertScalarFieldValue
def ConvertScalarFieldValue(value, field, path, require_str=False): if isinstance(value, str): value = "faker" + value elif isinstance(value, Decimal): value = int(value) + 10 return _ConvertScalarFieldValue(value, field, path, require_str)
setattr(json_format, '_ConvertScalarFieldValue', ConvertScalarFieldValue)
parse_message = json_format.ParseDict({"isbn": "xxx", "book_like": Decimal("2.0")}, GetBookLikesResult)
print(parse_message.book_like)
print(parse_message.isbn)
|
可以看到输出与符合我们定义的规则,而对于json_format.MessageToDict
,可以直接修改它使用的_Printer
中的_FieldToJsonObject
方法,并创建一个全新的MessageToDict
来调用到我们编写的Printer
的类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| from google.protobuf import json_format from grpc_example_common.protos.book.social_pb2 import GetBookLikesResult
class Printer(json_format._Printer): def _FieldToJsonObject(self, field, value): if field.cpp_type in json_format._INT_TYPES: return value+10 else: return super()._FieldToJsonObject(field, value)
def MessageToDict( message, including_default_value_fields=False, preserving_proto_field_name=False, use_integers_for_enums=False, descriptor_pool=None, float_precision=None): printer = Printer( including_default_value_fields, preserving_proto_field_name, use_integers_for_enums, descriptor_pool, float_precision=float_precision) return printer._MessageToJsonObject(message)
print(MessageToDict(GetBookLikesResult(book_like=10)))
|
通过输出后可以发现,输出的结果是20,符合我们的定义的规则。