Python-gRPC实践(5)--简述gRPC的高级用法

本文总阅读量

前言

Python-gRPC实践(3)–使用Python实现gRPC服务
讲述了如何编写和使用gRPC服务,而本文主要讲述一些业务之外的gRPC用法。

1.优雅的重启服务

线上运行的服务永远都不会一尘不变的,特别是对于微服务,服务的更新频率越来越快,需要重启的次数也就变多了,而每次重启都会导致服务正在处理的请求被强行关闭,所以我们需要优雅的重启服务,确保在滚动更新服务的时候不会影响到客户端,好在gRPC自带了一个类似的功能,代码例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from signal import signal, SIGTERM


def serve():
# 启动端口配置,默认为启用
options = [("grpc.so_reuseport", 1)]
# 正常的运行服务
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)
...
server.add_insecure_port("0.0.0.0:9000")
server.start()

def sig_handle(*args):
# 调用server.stop函数,调用stop(30)后,该服务会拒绝客户端新的请求,并返回一个threading.Event
event = server.stop(30)
# 调用event.wait()后,服务会一直等着,直到30秒后服务才会关闭,关闭之前服务还是能正常处理请求
event.wait()

# 接收一个SIGTERM信号的注册
signal(SIGTERM, sig_handle)
# 等待服务停止
server.wait_for_termination()

这个代码假设gRPC服务以端口复用模式运行(默认启用), 代码通过加上SIGTERM信号的对应回调后,我们的更新服务的逻辑就可以进行一些调整。首先是运行一个全新的gRPC服务在9000端口,这时候请求会分流到新和旧的gRPC服务,然后我们主动向旧的服务进程发送SIGTERM信号,这时旧服务就能开始拒绝新的请求,请求只会流向新的gRPC服务,之后我们只需要等服务自动关闭即可。

不过光这种方法还是不够,还需要注册中心等,具体实现逻辑可以通过RPC框架编写实践–服务的优雅的重启了解。

代码中的options是用于配置gRPC服务的一些功能,可以从gRPC c源码中获悉grpc_types.h,也可以通过代码生成的页面获取比较容易看到的说明文档。此外还有一些配置是需要传入一个Dict对象,可以参考service_config.proto

更多优雅的重启说明见gRPC Python Server Wait API

2.重试机制

gRPC服务是通过网络来传输请求的,所以可能会因为网络波动等原因造成请求失败,这时候就需要重试机制来使请求能成功通信了,但是引入重试机制后可能会带来一些负面的问题:

  • 比如服务端已经扛不住了,而客户端还一直重试,造成持续的增加服务端压力
  • 接口不是幂等,某个请求实际上已经处理完了,但是由于网络问题导致客户端认为服务端返回了错误,从而客户端进行了重试,最后导致服务端又再次处理请求,导致对应的数据发生了改变。

于是,gRPC抽象出一套重试机制,使用户可以灵活为自己想要的方法应用对应的重试机制,这个重试机制包括的最大重试次数,指数退避,可重试状态码等等,比如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BookGrpcService(BookSocialGrpcServiceMixin, BookManagerGrpcServiceMixin):
def __init__(self, host: str, port: int) -> None:
service_config_json = json.dumps({
"methodConfig": [{
"name": [
{
"service": "book_manager.BookManager",
"method": "create_book"
},
{
"service": "book_manager.BookManager",
"method": "delete_book"
},
],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"],
},
}]
})
options = []
# NOTE: 启动重试模式, 默认在v1.40.0后自动启用
options.append(("grpc.enable_retries", 1))
# 重试的一些参数配置
options.append(("grpc.service_config", service_config_json))


self.channel: grpc.Channel = grpc.intercept_channel(
grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor(), options=options
)
BookSocialGrpcServiceMixin.__init__(self, self.channel)
BookManagerGrpcServiceMixin.__init__(self, self.channel)

该代码复制于grpc-example-api-backend-service,并进行一点拓展,可以看到在调用grpc.intercept_channel进行初始化时,传入了一个options参数,这样在创建channel时会自动应用到对应的配置。

这个配置中,grpc.enable_retries为1代表启用了重试,而grpc.service_config接收到的是一个服务配置方法,具体可以访问service_config.proto了解,不过需要注意的是,在传入grpc.service_config时,传的是一个Dict的对应json序列化字符串。

接下来就在grpc.service_configmethodConfig中,我们传入了一个数组,这个数组里面只有一个Dict,这个Dict通过name指定了只有book_manager.BookManagercreate_book方法和delete_book方法会应用到重试的规则,如果这个配置为:

1
2
3
4
5
6
7
8
9
10
11
12
{
"methodConfig": [{
"name": [{}],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"],
},
}]
}

则会应用到所有的方法中。
在这个配置中,name负责指定应用范围,而retryPolicy负责指定应用规则,方便用户调整重试逻辑,他们的含义如下:

  • maxAttempts: 最大的重试次数,必须大于,如果大于5则他的值为5
  • Backoff系列: 指数退避参数,第一次重试是在random(0, initalBackoff)秒后进行的,而后续的重试间隔计算为min(initial_backoff*backoff_multiplier**(n-1), max_backoff)).
  • retryableStatusCodes: 指定什么样的响应状态码才需要重试,具体参照gRPC状态码文档。

开发者只要合理的进行配置,那么就能简单的应用重试机制,但是光看这点配置还是没办法解决重试导致机器过载的问题,所以gRPC还包含另外一组参数重试限流策略–retryThrottling来解决这个问题,这个配置的含义是当客户端的失败和成功超过某个阈值时,gRPC会通过金庸重试策略来防止由于重试导致服务器过载,使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class BookGrpcService(BookSocialGrpcServiceMixin, BookManagerGrpcServiceMixin):
def __init__(self, host: str, port: int) -> None:
service_config_json = json.dumps({
"methodConfig": [{
"name": [
{
"service": "book_manager.BookManager",
"method": "create_book"
},
{
"service": "book_manager.BookManager",
"method": "delete_book"
},
],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"],
},
}],
"retryThrottling": {
"maxTokens": 10,
"tokenRatio": 0.1
}
})
options = []
# NOTE: 启动重试模式, 默认在v1.40.0后自动启用
options.append(("grpc.enable_retries", 1))
# 重试的一些参数配置
options.append(("grpc.service_config", service_config_json))


self.channel: grpc.Channel = grpc.intercept_channel(
grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor(), options=options
)
BookSocialGrpcServiceMixin.__init__(self, self.channel)
BookManagerGrpcServiceMixin.__init__(self, self.channel)

在这段代码中,通过引入与methodConfig同级的retryThrottling配置,之后gRPC客户端会维护一个tokenCount的变量,它的初始值为maxToken,值得范围在0-maxToken波动,之后每一次请求成功时,tokenCount都会递增tokenRatio的值,每次请求失败时,tokenCount都会递减1,之后在重试之前如果发现tokaneCount小于或等于maxToken/2时,gRPC客户端不会进行重试且不会进行对冲。需要注意的是tokenRatio的设定范围只能在0-1之间,且只支持小数点后3位,如0.5466将为视为0.546,而MmaxTOkens的取值范围为0-1000的整数值。

目前Python gRPC不支持对冲,更多gRPC重试内容和设计见A6-client-retries

3.调用时参数

3.1.超时

超时机制, 是一个简单又方便的控制网络请求异常的一种方法, 它可以保证服务稳定(本质是快速失败), 良好的超时控制策略可以尽快的释放高延迟的请求,避免请求堆积。对于一般的HTTP请求,只需要在客户端添加一个超时参数,然后客户端检查在超时时间范围内还没收到对应的网络请求时,就会单方面关闭请求,这种处理方法在大多数情况下是没问题的,但在遇到阻塞性调用等情况时,客户端单方面关闭了请求,但服务端还在继续处理请求,并做出响应,不过客户端已经关闭了连接了,它无法收到该响应,这样就造成了服务端都性能浪费了。要解决这个问题,就需要引入一个超时传递的机制,使服务端能获取到本次请求的指定超时时间,并做出响应的操作。

好在gRPC已经自带了超时和超时传递的功能了,只需要在调用时添加timeout参数即可,如下:

1
2
3
4
5
6
class UserGrpcServiceMixin(object):
def __init__(self, channel: grpc.Channel):
self.user_stub: user_service.UserStub = user_service.UserStub(channel)

def create_user(self, *, uid: str, user_name: str, password: str) -> None:
self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password), timeout=10)

这样一来,客户端在调用的时候,就会进行倒计时,如果10秒内任然没有收到响应,那么它就会抛出超时异常,而服务端可以通过context.time_remaining()获取到还有多少剩余时间,并在主要逻辑代码使用,如下:

1
2
3
4
5
6
class UserService(user_service.UserServicer):
@conn_proxy()
def create_user(self, request: user_message.CreateUserRequest, context: grpc.ServicerContext) -> Empty:
with TimeoutContext(context.time_remaining()):
user_dal.create_user(uid=request.uid, user_name=request.user_name, password=request.password)
return Empty()

该代码引入一个TimeoutContext对象,这个对象会托管该作用域,如果该作用域的运行时长超多指定的时间,则会直接抛出错误,具体见RPC框架编写实践–超时与超时传递

3.2.等待服务就绪

一般的gRPC客户端在请求服务端都时候,如果发现服务端无法及时传输数据,比如channel处于TRANSIENT_FAILURESHUTDOWN状态时,客户端应该马上响应失败,这种方法被称为快速失败,但是服务端还有其它的状态如CONNECTINGREADYIDLE,这时候channel处于准备状态,还无法提供服务,但是在稍后将可以提供服务,比如客户端服务端同时启动,但因为短暂的网络故障导致服务端不可用等场景。这时候就可以使用wait-for-ready功能,使用wait-for-ready的调用将自动等待,直到服务器准备好接受请求时再发送,如下例子:

1
2
3
4
5
6
class UserGrpcServiceMixin(object):
def __init__(self, channel: grpc.Channel):
self.user_stub: user_service.UserStub = user_service.UserStub(channel)

def create_user(self, *, uid: str, user_name: str, password: str) -> None:
self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password), wait_for_ready=True)

这样一来在调用create_user时,如果服务端还处于准备阶段,那么他会一直等待,直到服务端处于运行状态,需要注意的是,等待时间不会超过timeout指定的时间。

4.数据类型的转换

在演示的代码grpc_example_common.helper.field中,我编写了几个参数,用于做Protobuf文件的数据类型转换,这些实现是非常简陋的,有些类型并没有考虑到,不过官方提供好了数据类型的转换,这些功能都位于google.protobuf.json_format之中,大多数我们只要用json_format.ParseDict或者是json_format.MessageToDict即可,不过有些时候我们需要进行一些定制化,但是json_format并没有开放出对应的接口,我们只能使用一些奇怪的方法来定制化,比如json_format.ParseDict,我们只要重置json_format._ConvertScalarFieldValue方法即可,用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from decimal import Decimal
from google.protobuf import json_format
from grpc_example_common.protos.book.social_pb2 import GetBookLikesResult


# 获取到原来的用法
_ConvertScalarFieldValue = json_format._ConvertScalarFieldValue

def ConvertScalarFieldValue(value, field, path, require_str=False):
# 制定我们的规则
if isinstance(value, str):
# 字符串添加一个faker
value = "faker" + value
elif isinstance(value, Decimal):
# Decimal就+10
value = int(value) + 10
# 最后再交给原来的方法处理
return _ConvertScalarFieldValue(value, field, path, require_str)

setattr(json_format, '_ConvertScalarFieldValue', ConvertScalarFieldValue)


parse_message = json_format.ParseDict({"isbn": "xxx", "book_like": Decimal("2.0")}, GetBookLikesResult)
# 输出符合我们的预期
print(parse_message.book_like)
# --> 12
print(parse_message.isbn)
# --> fakerxxx

可以看到输出与符合我们定义的规则,而对于json_format.MessageToDict,可以直接修改它使用的_Printer中的_FieldToJsonObject方法,并创建一个全新的MessageToDict来调用到我们编写的Printer的类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from google.protobuf import json_format
from grpc_example_common.protos.book.social_pb2 import GetBookLikesResult


class Printer(json_format._Printer): # noqa
def _FieldToJsonObject(self, field, value):
# 如果是int类型,就加10
if field.cpp_type in json_format._INT_TYPES:
return value+10
else:
return super()._FieldToJsonObject(field, value)


def MessageToDict(
message,
including_default_value_fields=False,
preserving_proto_field_name=False,
use_integers_for_enums=False,
descriptor_pool=None,
float_precision=None):
printer = Printer(
including_default_value_fields,
preserving_proto_field_name,
use_integers_for_enums,
descriptor_pool,
float_precision=float_precision)
# pylint: disable=protected-access
return printer._MessageToJsonObject(message)


print(MessageToDict(GetBookLikesResult(book_like=10)))
# --> 20

通过输出后可以发现,输出的结果是20,符合我们的定义的规则。

查看评论