Python-gRPC实践(9)--gRPC在gevent与asyncio的简单使用对比.md

本文总阅读量

前言

之前在Python-gRPC实践系列文章中都是在多线程模式中介绍如何使用gRPC,但是在Python生态中更偏好通过协程的方式来运行服务,而Python的协程运行方式却有多种,虽然他们的原理类似,但是使用上却有区别,本文主要是对在geventasyncio中对gRPC的使用进行对比。

1.简单的例子

本章以官方中的helloworld Protobuf文件为例子,介绍在不同协程中如何运行服务,该Protobuf文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
int32 sleep_millisecond = 2;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

其中HelloRequestHelloReplyMessageGretterservice,它们生成的对应代码位于grpc_asyncio_example/protos目录下面,而对应的客户端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import logging
import grpc
from grpc_asyncio_example.protos import helloworld_pb2, helloworld_pb2_grpc


def client(target: str = "localhost:9999"):
with grpc.insecure_channel(target=target) as channel:
stub: helloworld_pb2_grpc.GreeterStub = helloworld_pb2_grpc.GreeterStub(channel)
response: helloworld_pb2.HelloReply = stub.SayHello(helloworld_pb2.HelloRequest(name='you'), timeout=10)
logging.info("Greeter client received: " + response.message)


if __name__ == '__main__':
logging.basicConfig()
client()

服务端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from concurrent import futures
import time
import logging

import grpc
from grpc_asyncio_example.protos import helloworld_pb2, helloworld_pb2_grpc


class Greeter(helloworld_pb2_grpc.GreeterServicer):

def SayHello(
self, request: helloworld_pb2.HelloRequest, context: grpc.ServicerContext
) -> helloworld_pb2.HelloReply:
time.sleep(request.sleep_millisecond / 1000) # 模拟IO
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)


def serve(target: str = "localhost:9999") -> grpc.server:
server: grpc.server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port(target)
server.start()
return server


if __name__ == '__main__':
logging.basicConfig()
serve().wait_for_termination()

客户端与服务端的代码都非常简单,客户端主要是发送一个请求,该请求参数为name=you,而服务端则是解析参数,并把Hello {name}返回给客户端,最终由客户端把服务端返回的消息打印出来。
这份代码只可以用于Python的多线程中,无法在协程环境中运行,接下来讲分别介绍Python gRPCgeventasyncio中的使用和原理分析。

只介绍UNARY_UNARY请求方式,其他方式不同可以通过对应的文档了解如何使用。

2.Gevent

gevent 是一个基于协程的Python网络库,它通过greenletlibevlibuv事件循环之上提供高级同步API,在Python程序使用gevent很简单,只要在代码引入如下猴子补丁语法:

1
2
import gevent.monkey
gevent.monkey.patch_all()

那么整个程序的Python代码都支持gevent了,程序也可以通过gevent来运行。

不过Python gRPC是个例外,因为Python gRPC的核心是由C语言编写的,而gevent应用的猴子补丁只能涉及到用Python语言编写的代码,所以单靠应用gevent的猴子补丁是没办法使Python gRPCgevent中运行的,可能还会导致调用gRPC客户端方法时出现卡死的情况。

好在Python gRPC提供了一个用法,使Python gRPC能够在gevent中兼容运行,代码如下:

1
2
3
4
5
6
7
8
9
10
11
# 先应用猴子补丁
import gevent.monkey
gevent.monkey.patch_all()

# 再为gRPC启用gevent兼容模式
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()

# gRPC客户端或服务端的代码
...

这份代码通过调用grpc.experimental.gevent.init_gevent,使gRPC能在gevent中运行,而且在gevent模式下gRPC的所有模块和方法的使用方式都不需要改变。 但是gRPC在通过gevent的兼容模式运行的时候,性能会大打折扣,比如通过下面的命令:

1
2
3
4
5
6
ghz -c 100 -n 10000 \
--insecure \
--proto ../protos/grpc_asyncio_example/protos/helloworld.proto \
--call helloworld.Greeter.SayHello \
-d '{"name":"Joe", "sleep_millisecond": 10}' \
localhost:9999

gevent模式下的Python gRPC服务进行压测,其中c是指并发数,n为最大请求数,而-d是请求的参数,里面的sleep_millisecond代表服务端收到请求后会休眠10毫秒来模拟IO处理,他们的压测结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 多线程模式
Summary:
Count: 10000
Total: 2.67 s
Slowest: 45.01 ms
Fastest: 12.04 ms
Average: 26.34 ms
Requests/sec: 3746.87
# gevent兼容模式
Summary:
Count: 10000
Total: 5.23 s
Slowest: 69.31 ms
Fastest: 13.43 ms
Average: 51.83 ms
Requests/sec: 1913.44

通过压测结果可以发现,gevent兼容模式的性能只有多线程模式下的一半(以Requests/sec对比),会有这样的结果是因为Python gRPC的异步请求是在多线程上执行,同时Python gRPC是一个基于C的库,它可以绕过PythonGIL,从而获得了CPU多核的并发能力,而在启用了gevent兼容模式后gevent可能会导致多线程无法绕过PythonGIL,从而失去了上面提到的并发性。

此外Python gRPC目前只做到了兼容gevent的运行,通过commands.pyTestGevent的注释可以看出Python gRPC尚未对gevent模式的性能做出优化,所以建议不要在Python gRPC的服务端启用gevent兼容模式,在Python gRPC的客户端可以根据实际情况决定是否启用(比如在gunicorn + gevent + Flask运行的服务中的Python gRPC客户端)。

通过下列文件可以看出gevent兼容模式是把线程池替换为gevent的线程池:

3.Asyncio

Python gRPC一开始是不支持在Asyncio中运行的,所以社区出现了一个名为grpclib的库来解决这一问题,而官方的Python gRPC在后面的迭代后诞生了grpc.aio模块以支持grpcAsyncio中运行。

3.1.grpclib

grpclib这个库提供了纯Python的实现,这意味着能按自己的需求对它进行拓展,可塑性强。
所以诞生了基于grpclibpure-protobuf封装的另一个库–python-betterproto 。该库可以使开发者编写grpc代码更加的方便,比如helloworld生成对应的Message代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@dataclass(eq=False, repr=False)
class HelloRequest(betterproto.Message):
"""The request message containing the user's name."""

name: str = betterproto.string_field(1)
sleep_millisecond: int = betterproto.int32_field(2)


@dataclass(eq=False, repr=False)
class HelloReply(betterproto.Message):
"""The response message containing the greetings"""

message: str = betterproto.string_field(1)

可以看到这段代码打可读性比起官方的Python gRPC要高很多,这样开发者在查阅message的用法时会非常的方便,而客户端和服务端的用法则与官方的Python gRPC类似,比如客户端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import logging

from grpclib.client import Channel

from grpc_asyncio_example.protos import helloworld


async def client() -> None:
channel: Channel = Channel(host="localhost", port=9999)
response: helloworld.HelloReply = await helloworld.GreeterStub(channel).say_hello(
hello_request=helloworld.HelloRequest(name="you")
)
logging.info(response)

# don't forget to close the channel when done!
channel.close()


if __name__ == "__main__":
asyncio.run(client())

可以看出与多线程的Python gRPC代码类似,而服务端的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

from grpclib.server import Server

from grpc_asyncio_example.protos import helloworld


class HelloWorldService(helloworld.GreeterBase):
async def say_hello(
self, request: "helloworld.HelloRequest"
) -> "helloworld.HelloReply":
await asyncio.sleep(request.sleep_millisecond / 1000)
return helloworld.HelloReply(message="Hello, %s!" % request.name)


async def serve() -> Server:
server: Server = Server([HelloWorldService()])
await server.start("localhost", 9999)
return server


if __name__ == "__main__":

async def main() -> None:
await (await serve()).wait_closed()

asyncio.run(main())

可以看出服务端的代码与多线程的Python gRPC也类似,但是缺少Context的参数,如果跳进去helloworld.GretterBase查看源码:

1
2
3
4
5
6
7
8
9
10
class GreeterBase(ServiceBase):
async def say_hello(self, hello_request: "HelloRequest") -> "HelloReply":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

async def __rpc_say_hello(
self, stream: "grpclib.server.Stream[HelloRequest, HelloReply]"
) -> None:
request = await stream.recv_message()
response = await self.say_hello(request)
await stream.send_message(response)

则可以发现在__rpc_say_hellp方法中的stream对象是类似于官方Python gRPC中的Context,这意味着如果要使用Context方法时需要自己新增一个方法去处理,好在该库是纯Python编写的,grpclib对应客户端的Channel和服务端对应的Server代码也封装得很好,不像官方Python gRPC一样跳进源代码后就不知道它的代码在哪里,所以处理起来比较方便。

不过它也因此拥有了纯Python编写库的缺点–性能较低, 在经过与gevent一样的命令进行压测后得出如下数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# grpclib
Summary:
Count: 10000
Total: 6.37 s
Slowest: 96.38 ms
Fastest: 31.54 ms
Average: 63.33 ms
Requests/sec: 1571.05

# grpclib && uvloop
Summary:
Count: 10000
Total: 5.44 s
Slowest: 90.97 ms
Fastest: 38.18 ms
Average: 53.13 ms
Requests/sec: 1839.73

通过数据可以发现,betterproto的性能基本是多线程模式性能的3成,而这部分性能的差距主要来自于对HTTP2的解析,如果能把解析HTTP2的实现进行替换为高性能的版本,那么它的性能就能够接近Python gRPC官方实现的grpc.aio了。

此外,grpclib的功能比官方Python gRPC比较少,需要开发能力比较强以及对gRPC了解得比较深的开发者对其进行二次开发,才能使它发挥更好的作用。

3.2.grpc.aio

官方的Python gRPC现在通过grpc.aio模块对Asyncio提供了支持,它的用法与线程版本一致,只是涉及到IO相关的模块是存在于grpc.aio包中,对于使用者来说,需要把用到的grpc.XXX模块需要变为grpc.aio.XXX模块,同时要确保调用代码一处异步,处处异步,比如客户端的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import logging

import grpc
from grpc_asyncio_example.protos import helloworld_pb2, helloworld_pb2_grpc


async def client(target: str = "localhost:9999") -> None:
async with grpc.aio.insecure_channel(target) as channel:
stub: helloworld_pb2_grpc.GreeterStub = helloworld_pb2_grpc.GreeterStub(channel)
response: helloworld_pb2.HelloRequest = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
logging.info("Greeter client received: " + response.message)


if __name__ == '__main__':
logging.basicConfig()
asyncio.run(client())

通过代码可以看出初始化Channel的代码不是grpc.insecure_channel而是grpc.aio.insecure_channel,而且在helloworld_pb2_grpc.GreeterStub没有发生变化的情况下,调用stub.SayHello时仍然需要加上await语法,这是因为Python gRPCStub的设计模式类似于sans-io,是否要添加await语法则是取决于核心IO处理的实现,在这里处理IO的是Channel,所以开发者在调用stub.SayHello时最终会由stubchannel来发送请求,而channel发送请求是一个异步IO实现,所以需要添加await语法。

而对于服务端的改动也是类似的,对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import logging

import grpc
from grpc_asyncio_example.protos import helloworld_pb2, helloworld_pb2_grpc


class Greeter(helloworld_pb2_grpc.GreeterServicer):

async def SayHello(
self, request: helloworld_pb2.HelloRequest, context: grpc.aio.ServicerContext
) -> helloworld_pb2.HelloReply:
await asyncio.sleep(request.sleep_millisecond / 1000)
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)


async def serve(target: str = "localhost:9999") -> grpc.aio.Server:
server: grpc.aio.Server = grpc.aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port(target)
logging.info("Starting server on %s", target)
await server.start()
return server


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)

async def main() -> None:
await (await serve()).wait_for_termination()

asyncio.run(main())

可以看到代码中IO相关语法都需要添加await语句,同时SayHello方法添加了async语法,这样就可以在SayHello函数里面编写await语句了。
需要注意的是,由于Python gRPC是通过ServicerContext来传输数据的,导致ServicerContext中有些方法是与IO相关的,所以这里用到的ServicerContextgrpc.aio.ServicerContext

对于IO相关的用法可能有一些区别,但是对于其他跟IO无关的功能,则保持跟多线程模式的一致,比如客户端在启用时填写的参数如下:

1
2
3
4
5
with grpc.insecure_channel(target='localhost:50051',
options=[('grpc.lb_policy_name', 'pick_first'),
('grpc.enable_retries', 0),
('grpc.keepalive_timeout_ms', 10000)
]) as channel:

而在grpc.aio中代码如下:

1
2
3
4
5
async with grpc.aio.insecure_channel(target='localhost:50051',
options=[('grpc.lb_policy_name', 'pick_first'),
('grpc.enable_retries', 0),
('grpc.keepalive_timeout_ms', 10000)
]) as channel:

可以看到填写的参数Key名和值都是一样的,没有发生变化。

这里只是以一些常见的功能举例,简述不同模式Python gRPC的使用,详细的grpc.aio功能见grpc_asyncio文档,如果熟悉多线程模式下Python gRPCPython Asyncio的用法,那么上手grpc.aio是非常容易的。

3.2.1.grpc.aio的事件循环

Python gRPCgrpc.aio中使用到了asyncio的事件循环,所以可以通过uvloop来提高grpc.aio的性能,使用uvloop非常简单,只要在代码最前面添加如下代码:

1
2
import uvloop
uvloop.install()

就可以使程序获得uvloop带来的性能加成,在对使用asyncio.loopuvloop.loop的服务端代码进行压测后结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# asyncio.loop
Summary:
Count: 10000
Total: 2.15 s
Slowest: 30.56 ms
Fastest: 11.09 ms
Average: 21.19 ms
Requests/sec: 4643.73

# uvloop.loop
Summary:
Count: 10000
Total: 1.45 s
Slowest: 25.14 ms
Fastest: 9.41 ms
Average: 14.08 ms
Requests/sec: 6885.81

可以发现grpc.aio的性能比多线程模式稍好一些,同时在使用了uvloop后性能提升了50%(以Requests/sec对比)。

不过在与用uvicorn运行的服务中使用gRPC客户端时,需要确保在uvicorn启动后才初始化grpc.aio.Channel,因为在初始化grpc.aio.Channel时会获取到一个事件循环,如果当前没有事件循环时则会自动创建一个事件循环,而uvicorn在启动是会单独创建一个新的事件循环,这会导致grpc.aio.Channeluvicorn不在同一个事件循环运行导致运行出错。不过这个问题也很好解决,解决代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from typing import Any
import grpc
from starlette.applications import Starlette
from grpc_asyncio_example.protos import helloworld_pb2, helloworld_pb2_grpc


def create_app() -> Starlette:
app: Starlette = Starlette()

async def _before_server_start(*_: Any) -> None:
app.state.channel = grpc.aio.insecure_channel("localhost:9999")

async def _after_server_stop(*_: Any) -> None:
await app.state.channel.close()

app.add_event_handler("startup", _before_server_start)
app.add_event_handler("shutdown", _after_server_stop)
return app


if __name__ == "__main__":

import uvicorn # type: ignore
starlette_app: Starlette = create_app()
uvicorn.run(starlette_app)

在所示的代码中,grpc.aio.channelstartup事件时才进行初始化,在shutdown事件关闭,由于触发startup事件时,uvicorn已经初始化了一个事件循环,所以初始化grpc.aio.Channel时获取到的事件循环与uvicorn一致。

4.总结

如果为了能跟上社区的脚步,以及为了跨语言能够很好的合作,建议还是根据服务所在场景来使用多线程默认的gRPC或者grpc.aio,这是最好的选择;
对于类似gunicorn + gevnet + 类似于Flask框架的服务通过gRPC客户端发送请求时可以使用gevent兼容模式;
对于开发能力比较强,本身也是追求Pythonic的开发者且是基于Asyncio生态编写服务以及有比较强的定制需求的,则可以考虑使用betterproto

查看评论