Python 3.11 Asyncio新增的两个高级类

本文总阅读量

前记

Python Asyncio提供了很多基础的API以及对应的对象,如果只用于编写简单的HTTP API处理函数,那么这些Python Asyncio是足够的,但在面对一些复杂点多的需求或者编写网络相关框架时,就需要基于Python Asyncio的基础API封装成高级对象。目前比较常用的高级对象有两个,一个是用于管理代码域超时的timeout以及一个用于结构化并发的TaskGroup,它们最先出现在Trio这个协程库中,后来Anyio库也支持这两个对象,现在,准备发布Python 3.11中Asyncio库也包括这两个功能。

注: 正常情况下,调用经过封装的高级对象的耗时肯定会大于直接调用基础API的耗时,但是高级对象能使代码结构更加优美。比如starlette框架在集成anyio后,性能降低了4.5%,具体见:https://github.com/encode/starlette/pull/1157

1.人性化的超时

通常情况下,我们的代码调用结果只有成功或者是失败,但是对于客户端的网络调用来说还存在另外一种情况,就是网络调用可能会永远挂起,不会响应成功或者失败,然后就一直占用着文件描述符等系统资源。所以大多数的客户端都会实现超时机制来解决这个问题,但是客户端支持的超时API都是只针对自己的对应调用,比如httpx这个库,它的对应调用如下:

1
2
3
4
5
# 使用get方法请求, 超时时间为9秒
import asyncio
import httpx

asyncio.run(httpx.AsyncClient().get(url="http://so1n.me", timeout=9))

这个调用会请求到http://so1n.me,然后等待响应,如果该网站超过9秒仍未返回响应或者由于网络原因导致该调用没有返回响应,那么就会抛出一个超时错误。
这种设计的非常OK的,使用起来非常简单,但如果现在要求的更改为在9秒内请求两次http://so1n.me后还按照上面的写法,就会变得很糟糕,代码如下:

1
2
3
4
5
6
7
import asyncio
import httpx


async def demo() -> None:
await httpx.AsyncClient().get(url="http://so1n.me", timeout=9)
await httpx.AsyncClient().get(url="http://so1n.me", timeout=9)

这种情况下假设该方法的每个请求时长为8秒, 那么他的总请求时长为16秒, 已经超出总的超时时长为9秒的要求的, 但每个请求都没有触发超时机制,所以并不会抛出异常。
不过这时我们可以换个思路, 因为超时的原本意思是在n秒后中断此次请求, 也就是在某个时刻时终止请求, 那么我们只要在调用时计算出距离超时时刻还有多少时间差,并设置到timeout参数中,就可以使demo调用符合我们的要求了,代码改写后如下:

1
2
3
4
5
6
7
8
import asyncio
import httpx


async def demo(timeout: int = 9) -> None:
deadline: float = time.time() + 9
await httpx.AsyncClient().get(url="http://so1n.me", timeout=time.time() - deadline)
await httpx.AsyncClient().get(url="http://so1n.me", timeout=time.time() - deadline)

这段代码可以完美的工作, 假设第一个请求的时长为5秒, 那么第二次请求的超时参数的值会是4秒, 这是非常ok, 代码也依然保持简单。 不过目前还是有个缺点, 就是每次都计算一次超时时间, 然后再显示传进去, 这个超时是不可传递的, 如果有一个抽象能方便的使用, 那是非常好的,比如像使用wait_for后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import httpx
import time


async def sub_demo() -> None:
await httpx.AsyncClient().get(url="http://so1n.me")
await httpx.AsyncClient().get(url="http://so1n.me")


async def demo() -> None:
await asyncio.wait_for(sub_demo(), timeout=9)


asyncio.run(demo())

这段代码通过wait_for使一个函数内的调用共享一个截止时间,当抵达截止时间时, 无论执行当前已经执行到哪个函数, 都会触发超时异常。不过这样的实现会差点意思, 因为每有一个共享截止时间的代码范围, 就需要把对应的逻辑独立出来成一个新的函数, 这样的代码不是特别的优雅, 而且当需要传的参数比较多时, 这简直就是灾难了(当然也可以写成闭包的形式)。

好在Python通过with语句提供了一个代码范围的管理,所以我们可以尝试通过with语句来管理这片代码范围的执行超时,那该如何实现呢?熟悉with语句的开发者都知道,with语句实际上是一个带有__enter__方法和__exit__方法的类,这两个方法分别提供了进入代码范围和退出代码范围的调用,对于超时这个需求在结合with语句后,只需要在进入代码范围初始化一个计时器,退出时关闭计时器,如果计时器数完(也就是超时了)且尚未被退出逻辑关闭,则会引发超时,并取消代码范围的协程,大概的伪逻辑如下:

1
2
3
4
5
6
async def demo() -> None:
# 该代码只为了演示逻辑,实际上无法正常运行
timer = Timer(9)
await httpx.AsyncClient().get(url="http://so1n.me")
await httpx.AsyncClient().get(url="http://so1n.me")
timer.close()

通过伪代码逻辑可以看出两个运行的协程并跟timer并没有任何联系,timer无法管理到这两个协程的,所以timer超时时,两个协程还能正常运行,那该如何与他们建立联系呢?文章《Python的可等待对象在Asyncio的作用》中讲到在一个协程函数中通过await执行的子协程,是交给执行该协程函数对应的task对象管理的,也就是我们对执行协程函数的task对象进行的任何操作都是会传播到对应的子协程的,所以我们只要在进入代码范围时捕获到当前的task,然后通过loop.call_at方法在指定时间调用task.cancel取消task对象,并由task对象传播到被调用的子协程,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import httpx


async def demo() -> None:
current_task: asyncio.Task = asyncio.Task.current_task()
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
# 设置9秒后超时
timer: asyncio.events.TimerHandle = loop.call_at(loop.time() + 9, lambda: current_task.cancel())持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第2天,点击查看活动详情


await httpx.AsyncClient().get(url="http://so1n.me")
await httpx.AsyncClient().get(url="http://so1n.me")
if not timer.cancelled():
timer.cancel()

这段是可以正常运行的,接下来我们就需要把这段逻辑封装到一个类里面,这样调用者只需要简单的调用就可以实现整块代码域的超时管理,对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 这是一个简化版本的伪代码, 存在一些逻辑漏洞, 但是都包含了主要流程了,

import asyncio
from typing import Optional, Type
from types import TracebackType


class Deadline(object):
def __init__(
self,
delay: Optional[float],
loop: Optional[asyncio.AbstractEventLoop] = None,
timeout_exc: Optional[Exception] = None,
):
# 代表多少秒后超时
self._delay: Optional[float] = delay
# asyncio需要的事件循环
self._loop = loop or asyncio.get_event_loop()
# 当超时时,如何抛异常
self._timeout_exc: Exception = timeout_exc or asyncio.TimeoutError()

# 控制结束的future
self._deadline_future: asyncio.Future = asyncio.Future()
# 注册with语句捕获的future
self._with_scope_future: Optional[asyncio.Future] = None
if self._delay is not None:
# 计算截止时间和注册截止时间回调,通知event loop在截止时间执行超时机制
self._loop.call_at(self._loop.time() + self._delay, self._set_deadline_future_result)

def _set_deadline_future_result(self) -> None:
# 当到截止时间时, 设置执行结束, 并对还在执行的with future进行cancel操作
self._deadline_future.set_result(True)
if self._with_scope_future and not self._with_scope_future.cancelled():
self._with_scope_future.cancel()

def __enter__(self) -> "Deadline":
# 进入with语句范围
if self._with_scope_future:
# 一个实例同时只能调用一次, 多次调用会出错
raise RuntimeError("`with` can only be called once")
if self._delay is not None:
# 启动了超时机制

# 获取当前运行的task
main_task: Optional[asyncio.Task] = asyncio.Task.current_task(self._loop)
if not main_task:
raise RuntimeError("Can not found current task")
# 注册with语句所在的future
self._with_scope_future = main_task
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Optional[bool]:
# 由于执行完成或者是异常退出with语句范围
if self._with_scope_future:
self._with_scope_future = None
else:
return None

if self._deadline_future.done():
# 如果控制结束的future已经结束, 代表此次with语句范围的代码执行超时了
raise self._timeout_exc
else:
return None

现在超时类编写完成,它的使用方法如下:

1
2
3
4
5
6
7
8
9
10
import asyncio


async def demo() -> None:
with Deadline(delay=9):
await httpx.AsyncClient().get(url="http://so1n.me")
await httpx.AsyncClient().get(url="http://so1n.me")


asyncio.run(demo())

可以看到,这样的使用方法非常方便,不过这个功能在Python3.11已经提供了,可以通过https://github.com/python/cpython/blob/v3.11.0b3/Lib/asyncio/timeouts.py了解Python提供的timeout实现。

2.结构化并发

结构化并发借鉴了结构化编程这一名词,它的作用就是确保调用者进行了一个调用后还能控制这个调用过程,或者是得到调用结果,具体的结构化并发描述见Notes on structured concurrency, or: Go statement considered harmful或者译文【译】「结构化并发」简析,或:有害的go语句

在使用Python Asyncio编写代码时,会为了提高并发能力而通过asyncio.create创建很多Task运行,这种情况下可能导致调用者无法得到协程的运行结果,比如一个服务端为了提高并发能力,在接收到请求时通常都会分发给其它协程去处理,这时就可能导致代码不属于结构化并发, 下面通过一个生产消费者来模拟这一个行为,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import random


async def request_handle(data):
# 处理请求
print(data)
await asyncio.sleep(1)


async def recv_request(queue: asyncio.Queue):
while True:
# 接收请求
data = await queue.get()
# 分发给其它协程处理
asyncio.create_task(request_handle(data))


async def send_request(queue: asyncio.Queue):
# 发送请求
while True:
await queue.put(random.randint(0, 100))
await asyncio.sleep(0.01)


async def main():
queue: asyncio.Queue = asyncio.Queue()
asyncio.create_task(recv_request(queue))
await (send_request(queue))

asyncio.run(main())

这段代码首先是通过asyncio.create_task创建一个发送者在后台运行着,然后通过await等待send_request调用运行结束,不过send_request是不会结束的,它会一直运行下去,并且每隔0.01秒就会发送一个数据到queue里面。同时在后台运行的recv_request就会从queue获取到数据,并且为了不阻塞自己的处理逻辑,会通过create_task创建一个请求处理者来处理这个请求。

这段程序可以一直运行着,但是调用者不知道后台运行的任务是否一直在正常的运行着,而且可能需要他们在运行出错时捕获到对应的错误,并把错误抛出来,于是需要对main函数进行一些改造:

1
2
3
4
5
async def main():
queue: asyncio.Queue = asyncio.Queue()
recv_coro = recv_request(queue)
send_coro = send_request(queue)
await asyncio.gather(recv_coro, send_coro)

这样就能捕获到发送消息的协程和接收消息的协程的异常,并把错误抛出来了,不过对于接收消息并分发给其它协程这段逻辑却无法通过asyncio.gather来管理,因为该逻辑是收到消息就会创建一个协程来处理的,这是实时创建的,而asyncio.gather只能管理已经创建的Corotinue。

如果有一个类,可以像timeout管理这个作用域的所有派生出来的协程,捕获派送协程的异常,那就很棒了。而在Python3.11或者是Anyio中可以通过TaskGroup解决这个问题,在使用TaskGroup后,recv_request代码改写为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import random


async def request_handle(data):
# 处理请求
print(data)
await asyncio.sleep(1)


async def recv_request(queue: asyncio.Queue):
async with asyncio.task_group.TaskGroup() as tg:
while True:
# 接收请求
data = await queue.get()
# 分发给其它协程处理
tg.create_task(request_handle(data))

可以看到这个代码改动不大,首先是通过asyncio.task_group.TaskGroup创建一个对象并开启一个代码域,然后通过tg这个对象的create_task方法派生一个协程来处理数据,这个用法跟asyncio.create_task很像,但是通过tg.create_task创建的协程是会被tg管理的。
这时,如果request_handle对应的协程抛出来异常,tg对象也会退出并抛出对应的异常,同时这个代码域执行完毕后,也不会退出这片代码域,而是需要等所有通过tg.create_task创建的协程执行完毕后才会退出。

通过上面的timeout可以猜到TaskGroup也是在__aenter__时获取当前task对象并在后续使用着,现在通过taskgroups.py了解TaskGroup是如何执行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from asycnio import events
from asycnio import exceptions
from asycnio import tasks


class TaskGroup:

def __init__(self):
self._entered = False
self._exiting = False
self._aborting = False
self._loop = None
self._parent_task = None
self._parent_cancel_requested = False
self._tasks = set()
self._errors = []
self._base_error = None
self._on_completed_fut = None

async def __aenter__(self):
# 限制只能调用一次
if self._entered:
raise RuntimeError(
f"TaskGroup {self!r} has been already entered")
self._entered = True

if self._loop is None:
self._loop = events.get_running_loop()

# 获取当前的task
self._parent_task = tasks.current_task(self._loop)
if self._parent_task is None:
raise RuntimeError(
f'TaskGroup {self!r} cannot determine the parent task')

return self

async def __aexit__(self, et, exc, tb):
self._exiting = True
propagate_cancellation_error = None

if (exc is not None and
self._is_base_error(exc) and
self._base_error is None):
self._base_error = exc

if et is not None:
if et is exceptions.CancelledError:
if self._parent_cancel_requested and not self._parent_task.uncancel():
# Do nothing, i.e. swallow the error.
pass
else:
# 如果有一个协程已经取消了,就设置取消的exc
propagate_cancellation_error = exc

if not self._aborting:
# 取消所有的task
self._abort()

# 如果还有派生的协程来运行,就陷在这个逻辑中
while self._tasks:
if self._on_completed_fut is None:
self._on_completed_fut = self._loop.create_future()

try:
# 创建一个中间future来捕获所有派生协程的异常,并等待协程运行完毕
await self._on_completed_fut
except exceptions.CancelledError as ex:
# TaskGroup不会使_on_completed_fut抛出取消异常,但是如果main_task被取消时,会传播到_on_completed_fut
if not self._aborting:
# 与上面一样设置错误,并取消所有协程
propagate_cancellation_error = ex
self._abort()

self._on_completed_fut = None

assert not self._tasks

# 如果有异常,则抛出
if self._base_error is not None:
raise self._base_error

if propagate_cancellation_error is not None:
raise propagate_cancellation_error

if et is not None and et is not exceptions.CancelledError:
self._errors.append(exc)

# 抛出所有运行期间的异常
if self._errors:
errors = self._errors
self._errors = None

me = BaseExceptionGroup('unhandled errors in a TaskGroup', errors)
raise me from None

def create_task(self, coro, *, name=None, context=None):
# 判断目前是否生效,如果不生效就无法派生协程
if not self._entered:
raise RuntimeError(f"TaskGroup {self!r} has not been entered")
if self._exiting and not self._tasks:
raise RuntimeError(f"TaskGroup {self!r} is finished")
# 通过事件循环创建协程
if context is None:
task = self._loop.create_task(coro)
else:
task = self._loop.create_task(coro, context=context)
tasks._set_task_name(task, name)
# 添加task执行结果回调
task.add_done_callback(self._on_task_done)
# 把task添加到对应的self._task,这样其它方法就会判断协程是否运行完毕了
self._tasks.add(task)
return task

def _is_base_error(self, exc: BaseException) -> bool:
assert isinstance(exc, BaseException)
return isinstance(exc, (SystemExit, KeyboardInterrupt))

def _abort(self):
self._aborting = True

# 取消所有派生的协程
for t in self._tasks:
if not t.done():
t.cancel()

def _on_task_done(self, task):
# 安全的删除对应的task
self._tasks.discard(task)

if self._on_completed_fut is not None and not self._tasks:
if not self._on_completed_fut.done():
# 如果最后一个派生的协程运行结束,则设置中间future,这样TaskGroup.__aexit__的while循环就能继续执行了
self._on_completed_fut.set_result(True)

# 如果task已经取消或者没有异常,则不走下面的逻辑
if task.cancelled():
return
exc = task.exception()
if exc is None:
return

# 把异常添加到类中
self._errors.append(exc)
if self._is_base_error(exc) and self._base_error is None:
self._base_error = exc

# 最后处理下当前task
if self._parent_task.done():
# Not sure if this case is possible, but we want to handle
# it anyways.
self._loop.call_exception_handler({
'message': f'Task {task!r} has errored out but its parent '
f'task {self._parent_task} is already completed',
'exception': exc,
'task': task,
})
return
if not self._aborting and not self._parent_cancel_requested:
self._abort()
self._parent_cancel_requested = True
self._parent_task.cancel()

3.总结

可以看到,这两个功能都是通过task把我们的调用向子协程进行传播,这样一来就可以通过task方便的控制对应的协程,但是这也有一个缺点,就是一处函数为async,则处处函数都是async(对于需要IO调用的函数来说),而Go语言的协程就没有这种担忧,但是Go语言创建的协程是无法被管理的,除非创建协程的时候把Context对象传进去,并在对应的协程中通过channel来捕获Context对象的方法,这就要求开发Go库的开发者需要有良好的开发能力,能考虑到使用者在调用时是否需要考虑到超时,结构化并发等需求。

查看评论