Python Asyncio 库之从ChatGPT Bug了解Cancel机制

本文总阅读量

前记

最近几天,在使用ChatGPT时会发现无法使用历史记录功能。而在3月24号时,OpenAI公布了这次问题是由于某个Bug导致,导致部分用户能获得到其他用户的信息,而这一切是因为redis-py开发者没有考虑到Asyncio Cancel的问题。而Cancel有两个问题,一个是使用Cancel的问题,另外一个是没有使用Cancel的问题。

注:

  • 1:下文说的取消这个名词代表着Asyncio Cancel
  • 2:了解Cancel机制之前,需要了解Task对象的执行原理,可以通过可等待对象的原理了解执行原理。
  • Asyncio中,每个协程的实体都是Asyncio.Task对象,或者说是被Asyncio.Task托管,所以本文以Asyncio.Task对象代称协程。

1.ChatGPT遇到的问题

本次ChatGPT问题导致了部分用户能获取到另一个用户的数据,这是一个非常严重的问题。
根据OpenAI披露问题细节可以知道,ChatGPT使用Redis缓存用户的信息,而后台服务是基于Asyncio运行的,所以使用的是redis-py中的AsyncRedis与他们的Redis集群进行交互的。

AsyncRedis代表着是redis-py中的redis.asyncio.client中的Redis类,下同

在大部分情况下(包括测试),这种使用方式是没问题的,但是在他们对服务进行修改后导致Redis请求的取消数量激增,而取消是通过asyncioCancel实现的,正常情况下,在遇到Cancel后可以捕获取消异常再进行重试或者直接抛出异常。ChatGPT在业务层也是这样实现的,所以ChatGPT的代码并没有问题,真正有问题的是ChatGPT使用到的redis-py库中的AsyncRedis

AsyncRedis在ChatGPT服务器和Redis集群之间维护一个共享连接池,这些连接池存放着很多个TCP连接,TCP连接拥有请求队列和响应队列,在通过TCP发送数据时,实际上是把数据投递到请求队列中,再由操作系统把数据发送到目标服务器,而在接收到服务器数据后,会把数据存放到响应队列中,等待程序获取。
而ChatGPT服务每次与Redis交互时,都会从Redis连接池获取一个连接,并将请求推入到连接中的请求队列中,接着再从连接的响应队列获取数据并返回。
但是,如果交互A成功把请求推入到连接后,当前的Asyncio.Task对象由于意外被取消了,那么这时候ChatGPT服务只收到一个取消异常,而AsyncRedis会回收当前连接,并给交互B使用。而交互B在发送请求前,连接的响应队列恰好收到了Redis服务返回的数据,那么交互B在发送请求后就会从响应队列得到了交互A的响应数据了,更糟糕的是由于大部分的请求都是一个发送对应一个接收到,这就意味着后续交互获得的响应都是上一个交互请求的数据了。

ChatGPT问题细节并不本文重点,有兴趣的可以 访问OpenAI披露问题细节。同时应该知道,保护用户数据是后端开发者的主要职责之一,我们不止要校验请求的参数,还需要对返回的隐私数据进行校验,而不是单纯的依赖在使用的工具和服务。

2.重现并找出问题

通过OpenAI披露问题细节可以知道,这就是类似ORM的那种由于封装导致程序行为与真实结果不一致问题,开发者以为Asyncio.Task对象被取消了所以收不到数据,但真正的结果是Asyncio.Task对象被取消了,但是底层的数据结构却还能正常收到数据。

然而ChatGPT只是公布了大概的原因,并没有公布自己出现的代码是怎么实现的,在搜索了一圈后,发现redis-pyl的issue-2624中有可以重现该问题的代码,不过为了确保能100%重现这个问题,首先需要在本机安装Redis服务(如果不是使用本机的Redis服务,那么不一定能100%重现),然后运行issue-2624的代码(后续提到的示例代码都代表是本代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 对应的redis-py版本为4.5.2(新推出的修复版本4.5.3仍然有该问题,4.5.3只修复了PIPE)
import asyncio
from redis.asyncio import Redis


async def main():
async with Redis(single_connection_client=True) as r:

await r.set('foo', 'foo')
await r.set('bar', 'bar')

t = asyncio.create_task(r.get('foo'))
await asyncio.sleep(0) # 模拟IO
t.cancel()
try:
await t
print('try again, we did not cancel the task in time')
except asyncio.CancelledError as e:
print('managed to cancel the task, connection is left open with unread response')

print('bar:', await r.get('bar'))
print('ping:', await r.ping())
print('foo:', await r.get('foo'))


if __name__ == '__main__':
asyncio.run(main())

在运行该代码后可以得到如下输出:

1
2
3
4
managed to cancel the task, connection is left open with unread response
bar: b'foo'
ping: False
foo: b'PONG'

通过输出结果可以发现,第一个交互是从Redis获取键值为foo的值,但是在被显式的执行取消操作后,后续的交互获取的值都是上一次交互想要的值,这是非常糟糕的,为了找到问题的原因,需要深入到AsyncRedis的源码中查找。

在用户发起命令setget中涉及到的核心调用都是只有execute_command方法,它对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# redis.asyncio.client
class Redis:
async def execute_command(self, *args, **options):
"""Execute a command and return a parsed response"""
await self.initialize()
pool = self.connection_pool
command_name = args[0]
conn = self.connection or await pool.get_connection(command_name, **options)

if self.single_connection_client:
await self._single_conn_lock.acquire()
try:
return await conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda error: self._disconnect_raise(conn, error),
)
finally:
if self.single_connection_client:
self._single_conn_lock.release()
if not self.connection:
await pool.release(conn)

async def _send_command_parse_response(self, conn, command_name, *args, **options):
await conn.send_command(*args)
return await self.parse_response(conn, command_name, **options)

代码中的execute_command方法对应的就是从连接池获取连接并通过_send_command_parse_response方法把请求数据发送到连接中,然后等待连接返回的数据并返回,最后则是释放连接的占用,使连接回到连接池等待被下次调用。
可以发现真正与Redis发生交互的是send_commanad_parse_response方法中的send_commandparse_response两个IO操作方法。此外,由于通过复现问题的代码输出可以发现在显式调用取消后,下一个交互得到的输出永远为上一个交互得到的结果,这也就意味着send_command这个方法是执行成功的,所以取消操作真正命中的是self.parse_response方法。

如果重新问题代码中的asyncio.sleep(0)被移除,则不会有问题发生,这是因为取消操作命中的是send_command方法

如果了解了取消机制的原理(后文会有详细的描述),那么可以知道在对示例代码中的t调用cancel方法执行取消操作的时候,会取消正在等待的parse_response方法,然后再沿着堆栈向上抛出异常。然而execute_commanad中的名为call_with_retry的重试方法只针对连接异常和超时异常进行重试,所以call_with_retry在收到取消异常时会继续往上抛,这时execute_command方法就会回收连接,并把异常向上抛到调用AsyncRedis的业务层中供用户使用。

由于Cancel可以向上堆栈传递取消异常,于是业务层,AsyncRedis层能收到取消异常,并最终由t这个Asyncio.Task对象捕获到了取消异常,至此当前协程就已经被取消了,可以认为程序是没有什么问题,取消操作也完成了自己的任务。
不过AsyncRedis中的连接并未被关闭且响应队列的数据写入操作是系统决定的,而不是当前t对象管理的(可以认为是另外一个Asyncio.Task对象在管理的),这意味着针对t对象的取消操作并不会影响到响应队列的写入操作,响应队列仍然还能接收另一端发送过来的数据,这下问题就出现了,由于Redis协议是按照一进一出来处理数据的(仅限于当前示例代码的Redis命令),这意味着必须每次发送一个请求后都要读取一次请求,但是取消操作却把某一次读取数据的操作给取消了,这就导致了后续取的数据永远是上一个。

但是这个例子涉及到系统调用,比较难理解具体的逻辑,于是把实现进行一下转换,把示例代码全转换为纯Python实现,不引入其他的库,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio


async def main():
# 针对于worker来说是只写的
write_queue = asyncio.Queue()
# 针对于worker来说是只读的
read_queue = asyncio.Queue()

async def porter():
"""搬运工负责把write_queue中的数据写入到read_queue中"""
while True:
item = await write_queue.get()
await read_queue.put(item)
write_queue.task_done()

asyncio.create_task(porter())

async def worker(cnt):
"""模拟Redis的从写入队列写入数据,并从只读队列中读取数据"""
await write_queue.put(cnt)
print(f"worker:{cnt} get num :{await read_queue.get()}")

# 模拟取消操作
t = asyncio.create_task(worker(1))
await asyncio.sleep(0)
t.cancel()
try:
await t
print("no cancel")
except asyncio.CancelledError:
print("cancel")

await worker(2)
await worker(3)


if __name__ == '__main__':
asyncio.run(main())

这段代码中使用write_queueread_queue分别模拟连接的写入队列和读取队列,而porter则是一个搬运工,它负责把写入队列的数据转移到只读队列中,模拟redis-py客户端在通过连接发送数据后获取数据的功能,在运行代码后可以得到如下输出:

1
2
3
cancel
worker:2 get num :1
worker:3 get num :2

通过输出可以发现该程序的输出结果与issue-2624的输出结果类似–下一个发起请求的函数的响应结果实际上是上一个请求产生的。
而且通过这份代码可以明显的看出,负责转移数据的poeter函数是由asyncio.create_task启动,这代表他是由另一个Asyncio.Task对象管理的(在这个场景中可以认为poeterredis-py连接池中的某一个连接)。

这样一来,由于poeter对象与后面的t对象并不是同一个Asyncio.Task对象,而t对象的worker函数在往写入队列写入数据后,就被主动取消了,且取消这个操作只影响到了t这个对象,这意味着只取消了read_queue.get这个动作,而其他的协程/动作都还在正常的执行着,包括poeter对象。
所以poeter对象还是能正常的从write_queue队列中获取数据再推到read_queue队列中,这也就导致了worker(2)worker(3)虽然是正常执行的推入的是23,但是他们分别得到的值却是12

通过上述的分析可以看出,Asyncio的取消操作是属于某一对象的方法,这意味着取消操作只能取消并中断该方法对应的Asyncio.Task对象执行,不能取消其他Asyncio.Task对象的操作,此外,取消只能管理某个动作,不会去影响到动作相关的资源。
同时要知道的是由于Asyncio的机制是任何协程都能互相影响,也就是A协程可以取消B协程,这意味着开发者在业务代码对某个协程进行取消时,取消是可以被传递的。
现在再回到redis-py这个库本身,由于redis-py本身没有用到取消操作相关的语法,于是没有考虑到取消操作这件事,但是业务代码中针对某一Asyncio.Task对象执行取消操作时,取消这个动作影响会到了redis-py库的方法,最终导致这个问题出现。

Python.Asyncio的任何协程都能互相影响的机制解决了协程无法被管理导致的内存溢出等问题,但这也间接的导致了库的开发者需要注意到取消机制的存在。
而与之相反的是Go协程,由于它本身的特性,虽然B协程是A协程创造出来的,但是A协程没有办法影响到B协程,为了实现A协程能影响到B协程,需要显式的把context从A协程传到B协程中,然后由A协程通过context传递动作,B协程根据context的结果做出响应,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"context"
"fmt"
"time"
)

func demo(ctx context.Context, quit chan int) {
for {
select {
case <-ctx.Done():
fmt.Printf("ctx.Done")
quit <- 1
case <-quit:
fmt.Printf("quit")
return
}
}
}

func main() {
rootContext := context.Background()
ch := make(chan int)
ctx, cancelFunc := context.WithCancel(rootContext)
go demo(ctx, ch)
cancelFunc()
time.Sleep(time.Second * 1)
}

在这个代码中main函数可以认为是协程A,而demo函数是协程B,在通过go demo启动协程B后,协程B就无法再被管理,如果要管理协程B,就必须像示例代码一样让demo函数支持外部传入context变量,同时内部会对context变量的状态进行监听并做出响应的操作。
这就意味着新启动的协程能支持协程的哪些功能,全由这个协程的实现者决定,这样即使redis-py没有考虑到取消的功能,也并不会被用户的业务代码影响到,但反过来用户却没办法使用到取消功能来影响redis-py的功能,除非开发者有意去添加取消功能。

通过对比发现,为了功能完备,无论使用Go还是Python都需要开发者去针对取消机制去进行处理,如果开发者没有去处理的话,在Go中是无法使用取消功能,而Python则是会引发一些问题,这种情况下Go的机制会好一些,不过对于结构化并发和超时等功能,Go都需要库开发者去显式的支持,而Python却不用,这一点我会觉得Python的实现更好一些,当然,哪种实现更好还是取决于使用者本人。

3.使用取消需要考虑的问题

通过ChatGPT的问题可以知道在使用Asyncio时没有考虑到取消这个场景时会导致一些问题的发生,然而在使用取消机制时也有很多问题需要考虑的。

3.1.取消无法取消已经完成的Asyncio.Task对象

还记得一开始的例子代码中的一段asyncio.sleep(0)的代码吗?这里是特意修改为0从而确保这段代码能触发Bug,如果休眠时间变长,则不一定能触发,为了能更好的理解,这里使用转换后的例子代码来演示,并把asyncio.sleep(0)改为asyncio.sleep(0.1),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio


async def main():
# 针对于worker来说是只写的
write_queue = asyncio.Queue()
# 针对于worker来说是只读的
read_queue = asyncio.Queue()

async def porter():
"""搬运工负责把write_queue中的数据写入到read_queue中"""
while True:
item = await write_queue.get()
await read_queue.put(item)
write_queue.task_done()

asyncio.create_task(porter())

async def worker(cnt):
"""模拟Redis的从写入队列写入数据,并从只读队列中读取数据"""
await write_queue.put(cnt)
print(f"worker:{cnt} get num :{await read_queue.get()}")

# 模拟取消操作
t = asyncio.create_task(worker(1))
await asyncio.sleep(0.1)
t.cancel()
try:
await t
print("no cancel")
except asyncio.CancelledError:
print("cancel")

await worker(2)
await worker(3)


if __name__ == '__main__':
asyncio.run(main())

运行代码后就可以看到终端会有如下输出:

1
2
3
4
worker:1 get num :1
no cancel
worker:2 get num :2
worker:3 get num :3

通过输出结果可以发现,t对象并没有被取消,后面的worker对象执行也是正常的了。这是因为取消这个动作执行的时候t对象已经执行完毕了,而取消只能取消处于等待过程的Asyncio.Task对象,所以在这个例子中的取消操作是没有执行成功的。

为了能更好的了解这个机制,可以通过如下代码进行验证:

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio


async def main():
t = asyncio.create_task(asyncio.sleep(0))
print(t.done()) # False
await asyncio.sleep(0.1)
print(t.done()) # True


if __name__ == '__main__':
asyncio.run(main())

通过运行该代码后,可以发现第一次打印t.doneFalse,而第二次打印为True。其实在可等待对象的原理 中介绍了Task是协程的载体,而Task继承于Future,所以通过Future的源码也可以知道Cancel方法仅当协程为等待状态(PENDING)时生效,而且他的工作只是把协程状态改为CANCELLED状态(Asyncio.Task对象会复杂一点):

1
2
3
4
5
6
7
8
9
10
class Future:
... # 省略其他方法
def cancel(self, msg=None):
self.__log_traceback = False
if self._state != _PENDING:
return False
self._state = _CANCELLED
self._cancel_message = msg
self.__schedule_callbacks()
return True

3.2.取消是一个异步操作

取消是一个异步操作,这意味着取消这个操作可能需要一段时间才能执行完毕。

如果是第一次接触示例代码的话,那么一定会好奇为何一定要有asyncio.sleep(0)这个语句,才能百分百复现Bug?为了验证这个问题,先编写一个简单的示例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio


async def main():
async def demo():
print("demo函数运行了!")
await asyncio.sleep(10)
t = asyncio.create_task(demo())
t.cancel()
print(t.cancelled())
try:
await t
except asyncio.CancelledError:
print("demo已经被取消了")
print(t.cancelled())

if __name__ == '__main__':
asyncio.run(main())

在运行该示例时可以发现有如下输出:

1
2
3
False
demo已经被取消了
True

可以发现demo函数的第一句就已经没有执行了, 这是因为没有asyncio.sleep(0)来让出控制权,事件循环就无法去调度t对象去启动,这时即使针对t对象执行了取消操作,但由于t对象没有运行,那么此时的操作只能是给t对象打个标记,告诉t对象已经是取消状态了,然后等到调用await t主动去激活t对象后,t对象就会开始运行,但是在启动后发现自己已经被取消了,就不会去执行demo函数,而是直接抛错了。

这个例子解释了加上asyncio.sleep(0)这个语句的必要性,但是示例代码中已经加上了asyncio.sleep(0)语句了,后面为啥还要使用await t呢?这是因为执行取消操作时会取消当前t对象正在执行的操作,然后逐步把异常向上抛,但是内部可以由开发者捕获取消异常后执行一些等待操作(如IO操作),此时t对象就无法马上收到取消异常,这意味着取消动作还未结束。
当IO操作执行完后,会继续向上抛异常,然后t对象在接收到取消异常后会转为完成状态,此时取消操作也就完成了。
如下是模拟取消操作被堵住的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def main():
async def demo():
try:
await asyncio.sleep(0)
except asyncio.CancelledError as e:
await asyncio.sleep(2)
raise e

t = asyncio.create_task(demo())
await asyncio.sleep(0)
t.cancel()
start_cancel_time = time.time()
print(t.cancelled())
await asyncio.sleep(0.5)
print(t.cancelled())
try:
await t
except asyncio.CancelledError as e:
print(t.cancelled())
print(f"取消操作的执行时间为:{time.time() - start_cancel_time}")

if __name__ == '__main__':
asyncio.run(main())

在执行完成的时候,控制台有如下输出:

1
2
3
4
False
False
True
取消操作的执行时间为:2.002300500869751

通过输出可以看到,第一次打印取消状态和休眠0.5秒后第二次打印的取消状态都是一样的,表示协程尚未被取消,但在执行await t后,就打印了协程已经被取消了,且可以发现从取消协程开始到结束所花费的时间正好是2秒左右,与demo函数捕获取消异常后休眠2秒一致。

3.3.取消异常的处理

从上面可以的分析可以知道,取消操作是取消某一段执行,然后向上抛异常,当Asyncio.Task捕获到异常后,会标记AsyncioTask对象的状态为取消。那如果向Asyncio.Task对象执行取消操作,但是取消的异常被屏蔽了,会如何处理呢?下面是一个基于3.1中示例代码修改的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def main():
async def demo():
try:
await asyncio.sleep(0)
except asyncio.CancelledError as e:
print("任务已经被取消了")
await asyncio.sleep(2)

t = asyncio.create_task(demo())
await asyncio.sleep(0)
t.cancel()
print(t.cancelled())
print("取消的标记为: ", t._must_cancel)
await asyncio.sleep(0)
try:
await t
print("没有收到取消操作, 协程的取消结果为:", t.cancelled())
except asyncio.CancelledError as e:
print("已经收到取消操作, 协程的取消结果为:", t.cancelled())

if __name__ == '__main__':
asyncio.run(main())

运行代码后可以发现输出如下:

1
2
3
4
False
取消的标记为: True
任务已经被取消了
没有收到取消操作, 协程的取消结果为: False

通过取消标记可以发现取消操作已经正确执行了,但是由于取消异常没有被向上抛出,导致t对象无法捕获到取消异常,从而认为t对象的取消状态为False。所以如果不是有意对取消异常做特别处理(比如超时的实现是把取消异常转为超时异常抛出),那么在捕获取消异常进行处理后仍然需要抛出取消异常。

4.取消机制的实现

前面写了取消机制的几种情况,都是通过源码分析出来的,因为官方文档并没有明显的说明,所以要用好取消机制,最好还是过一遍相关的源码,好在跟取消机制相关的地方不多,只跟Asyncio.Task对象和Asyncio事件循环的调度有关。

首先是看Task对象,Task对象跟取消机制相关代码如下(uncancel相关代码也移除了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None, name=None, context=None):
...
self._must_cancel = False
self._fut_waiter = None
...

def cancel(self, msg=None):
...
if self.done():
return False
if self._fut_waiter is not None:
if self._fut_waiter.cancel(msg=msg):
return True
self._must_cancel = True
self._cancel_message = msg
return True

def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
f'_step(): already done: {self!r}, {exc!r}')
if self._must_cancel:
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None

_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
super().cancel(msg=self._cancel_message)
else:
super().set_result(exc.value)
except exceptions.CancelledError as exc:
# Save the original exception so we can chain it later.
self._cancelled_exc = exc
super().cancel() # I.e., Future.cancel(self).
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
else:
# 这里没有取消操作的相关逻辑,这部分不属于本文的主体,直接省略,知道会通过
# `self._loop.call_sonn`调用`self.__step`使`Asyncio.Task`能继续往下走
# 如果如果发现有其他的`coro`或者`Future`对象在等待时就把他们赋值到`self._fut_waiter`就可以了
# 具体内容请参考《Python的可等待对象在Asyncio的作用》文章
...
result._asyncio_future_blocking = False
result.add_done_callback(self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(msg=self._cancel_message):
self._must_cancel = False
...
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __wakeup(self, future):
try:
future.result()
except BaseException as exc:
# This may also be a cancellation.
self.__step(exc)
else:
self.__step()
self = None # Needed to break cycles when an exception occurs.

在阅读源码之前,先简单的过一下Asyncio.Task的原理,在对Asyncio.Task进行初始化时,会调用__init__方法进行初始化,然后需要用户通过await调用初始化好的Asynio.Task对象时,才会触发_step方法,_step方法则是通过生成器的send方法来驱动协程的执行,如果协程内部有非Asyncio.Task可等待对象,则会把他们赋值到self._fut_waiter对象中,并通过loop.call_xxx系列函数安排事件循环在下次有空且self._fut_Waiter对象准备好时再来调用_step方法,然后就这样一直循环反复,直到执行结束或者遇到异常为止,如下面的代码(执行逻辑见注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio


async def demo():
await asyncio.Future() # <--3:发现coro.send返回的对象是asyncio.Future,此时把asyncio.Future赋值到self._fut_waiter中,并等待`asyncio.Future`执行完毕后来调用自己`self.__wakeup方法`从而再回到`self.__step`方法。
await asyncio.Future() # <--4:此时被事件循环唤醒,但是又发现是asyncio.Future,与第三步处理类似
# <-- 后续没有任务语句,直接抛出StopIteration异常,交由`Asyncio.Task`对象处理

async def main():
t = asyncio.create_task(demo()) # <-- 1:此时创建一个`asyncio.Task`对象
await t # <-- 2:第一次调用了`_step`方法


asyncio.run(main())

这段代码中,第一部分是__init__方法,这个方法很简单,它会初始化self._must_cancelFalse,以及self._fut_waiter为空。

第二部分则是取消操作对应的cancel方法,它会先判断当前Asyncio.Task是否已经执行完毕,如果执行完毕就会直接退出,不执行cancel的剩下操作,这里解释了3.1标题中说的《取消无法取消已经完成的Asyncio.Task对象》。
如果Asyncio.Task没有执行完毕则会判断当前的self._fut_waiter对象是否为空,如果不为空,则直接取消self._fut_waiter对象,并返回。如果为空则设置self._must_cancelTrue

第三部分是_step方法,该方法如果发现self._must_cancelTrue,且没有异常或者异常不属于取消异常时会生成一份默认的取消异常,然后就直接到了coro.throw(exc)这行代码,直接告诉被Asyncio.Task托管的Coro出现了取消异常,然后Coro会抛出对应的取消异常并被Asyncio.Task捕获并退出。这时如果Coro并没有通过coro.send预激(也就是第一次调用_step方法时直接命中了coro.throw方法),则Coro不会被运行,此时对应的是3.2.取消是一个异步操作中的第一个示例代码的场景。
然而如果Coro还有await语法没走完就会进入到没异常的else语块中,该语块会向result注册一个回调,让result完成时来调用自己的__wakeup方法,如果result抛出异常就会把异常传入_step方法中。现在循环就又回到了_step方法了, 而这次不属于第一次运行_step方法了,此时执行了coro.throw(exc)这行代码,那么就会使Coro中的对应的await语句抛出取消异常,然后交由Asyncio.Task捕获取消异常并返回。

_step方法入口时就把self._must_cancel设置为Fasle了,而else语块还要判断self._must_cancelTrue的情况,这里应该是考虑了线程不安全的问题,但是我没有去验证。

可以发现第二部分能让开发者主动诱发self._fut_waiter对象抛出取消异常,而__wakeup能监听self._fut_waiter的取消异常,从而传入_step方法中,最终让Asyncio.Task的状态变为取消状态,这是一个闭环。
如果这一环中有有一处代码捕获到取消异常并不再抛出取消异常,那么Asyncio.Task就无法变为取消状态,而是以正常的步骤继续调用_step方法。

5.总结

可以发现Asyncio的取消机制还是有一点点复杂,作为库的开发者,需要无时无刻考虑到如果自己编写的代码遇到取消异常时,需要怎么去妥善处理(其实Python自己实现的Asyncio同步原语或者高级API也被取消异常坑过)。
而作为使用者,也需要知道取消异常的使用局限性和使用方法。

查看评论