Python Asyncio 库之从ChatGPT Bug了解Cancel机制
前记
最近几天,在使用ChatGPT时会发现无法使用历史记录功能。而在3月24号时,OpenAI公布了这次问题是由于某个Bug导致,导致部分用户能获得到其他用户的信息,而这一切是因为redis-py
开发者没有考虑到Asyncio Cancel
的问题。而Cancel
有两个问题,一个是使用Cancel
的问题,另外一个是没有使用Cancel
的问题。
注:
- 1:下文说的取消这个名词代表着
Asyncio Cancel
- 2:了解
Cancel
机制之前,需要了解Task
对象的执行原理,可以通过可等待对象的原理了解执行原理。 - 在
Asyncio
中,每个协程的实体都是Asyncio.Task
对象,或者说是被Asyncio.Task
托管,所以本文以Asyncio.Task
对象代称协程。
1.ChatGPT遇到的问题
本次ChatGPT问题导致了部分用户能获取到另一个用户的数据,这是一个非常严重的问题。
根据OpenAI披露问题细节可以知道,ChatGPT使用Redis
缓存用户的信息,而后台服务是基于Asyncio
运行的,所以使用的是redis-py
中的AsyncRedis
与他们的Redis
集群进行交互的。
AsyncRedis代表着是
redis-py
中的redis.asyncio.client
中的Redis
类,下同
在大部分情况下(包括测试),这种使用方式是没问题的,但是在他们对服务进行修改后导致Redis
请求的取消数量激增,而取消是通过asyncio
的Cancel
实现的,正常情况下,在遇到Cancel
后可以捕获取消异常再进行重试或者直接抛出异常。ChatGPT在业务层也是这样实现的,所以ChatGPT的代码并没有问题,真正有问题的是ChatGPT使用到的redis-py
库中的AsyncRedis
。
AsyncRedis
在ChatGPT服务器和Redis
集群之间维护一个共享连接池,这些连接池存放着很多个TCP连接,TCP连接拥有请求队列和响应队列,在通过TCP发送数据时,实际上是把数据投递到请求队列中,再由操作系统把数据发送到目标服务器,而在接收到服务器数据后,会把数据存放到响应队列中,等待程序获取。
而ChatGPT服务每次与Redis
交互时,都会从Redis
连接池获取一个连接,并将请求推入到连接中的请求队列中,接着再从连接的响应队列获取数据并返回。
但是,如果交互A成功把请求推入到连接后,当前的Asyncio.Task
对象由于意外被取消了,那么这时候ChatGPT服务只收到一个取消异常,而AsyncRedis
会回收当前连接,并给交互B使用。而交互B在发送请求前,连接的响应队列恰好收到了Redis
服务返回的数据,那么交互B在发送请求后就会从响应队列得到了交互A的响应数据了,更糟糕的是由于大部分的请求都是一个发送对应一个接收到,这就意味着后续交互获得的响应都是上一个交互请求的数据了。
ChatGPT问题细节并不本文重点,有兴趣的可以 访问OpenAI披露问题细节。同时应该知道,保护用户数据是后端开发者的主要职责之一,我们不止要校验请求的参数,还需要对返回的隐私数据进行校验,而不是单纯的依赖在使用的工具和服务。
2.重现并找出问题
通过OpenAI披露问题细节可以知道,这就是类似ORM的那种由于封装导致程序行为与真实结果不一致问题,开发者以为Asyncio.Task
对象被取消了所以收不到数据,但真正的结果是Asyncio.Task
对象被取消了,但是底层的数据结构却还能正常收到数据。
然而ChatGPT只是公布了大概的原因,并没有公布自己出现的代码是怎么实现的,在搜索了一圈后,发现redis-py
l的issue-2624中有可以重现该问题的代码,不过为了确保能100%重现这个问题,首先需要在本机安装Redis
服务(如果不是使用本机的Redis
服务,那么不一定能100%重现),然后运行issue-2624的代码(后续提到的示例代码都代表是本代码):
1 |
|
在运行该代码后可以得到如下输出:
1 |
|
通过输出结果可以发现,第一个交互是从Redis
获取键值为foo
的值,但是在被显式的执行取消操作后,后续的交互获取的值都是上一次交互想要的值,这是非常糟糕的,为了找到问题的原因,需要深入到AsyncRedis
的源码中查找。
在用户发起命令set
和get
中涉及到的核心调用都是只有execute_command
方法,它对应的代码如下:
1 |
|
代码中的execute_command
方法对应的就是从连接池获取连接并通过_send_command_parse_response
方法把请求数据发送到连接中,然后等待连接返回的数据并返回,最后则是释放连接的占用,使连接回到连接池等待被下次调用。
可以发现真正与Redis
发生交互的是send_commanad_parse_response
方法中的send_command
和parse_response
两个IO操作方法。此外,由于通过复现问题的代码输出可以发现在显式调用取消后,下一个交互得到的输出永远为上一个交互得到的结果,这也就意味着send_command
这个方法是执行成功的,所以取消操作真正命中的是self.parse_response
方法。
如果重新问题代码中的
asyncio.sleep(0)
被移除,则不会有问题发生,这是因为取消操作命中的是send_command
方法
如果了解了取消机制的原理(后文会有详细的描述),那么可以知道在对示例代码中的t
调用cancel
方法执行取消操作的时候,会取消正在等待的parse_response
方法,然后再沿着堆栈向上抛出异常。然而execute_commanad
中的名为call_with_retry
的重试方法只针对连接异常和超时异常进行重试,所以call_with_retry
在收到取消异常时会继续往上抛,这时execute_command
方法就会回收连接,并把异常向上抛到调用AsyncRedis
的业务层中供用户使用。
由于Cancel
可以向上堆栈传递取消异常,于是业务层,AsyncRedis
层能收到取消异常,并最终由t
这个Asyncio.Task
对象捕获到了取消异常,至此当前协程就已经被取消了,可以认为程序是没有什么问题,取消操作也完成了自己的任务。
不过AsyncRedis
中的连接并未被关闭且响应队列的数据写入操作是系统决定的,而不是当前t
对象管理的(可以认为是另外一个Asyncio.Task
对象在管理的),这意味着针对t
对象的取消操作并不会影响到响应队列的写入操作,响应队列仍然还能接收另一端发送过来的数据,这下问题就出现了,由于Redis
协议是按照一进一出来处理数据的(仅限于当前示例代码的Redis
命令),这意味着必须每次发送一个请求后都要读取一次请求,但是取消操作却把某一次读取数据的操作给取消了,这就导致了后续取的数据永远是上一个。
但是这个例子涉及到系统调用,比较难理解具体的逻辑,于是把实现进行一下转换,把示例代码全转换为纯Python
实现,不引入其他的库,代码如下:
1 |
|
这段代码中使用write_queue
和read_queue
分别模拟连接的写入队列和读取队列,而porter
则是一个搬运工,它负责把写入队列的数据转移到只读队列中,模拟redis-py
客户端在通过连接发送数据后获取数据的功能,在运行代码后可以得到如下输出:
1 |
|
通过输出可以发现该程序的输出结果与issue-2624的输出结果类似–下一个发起请求的函数的响应结果实际上是上一个请求产生的。
而且通过这份代码可以明显的看出,负责转移数据的poeter
函数是由asyncio.create_task
启动,这代表他是由另一个Asyncio.Task
对象管理的(在这个场景中可以认为poeter
是redis-py
连接池中的某一个连接)。
这样一来,由于poeter
对象与后面的t
对象并不是同一个Asyncio.Task
对象,而t
对象的worker
函数在往写入队列写入数据后,就被主动取消了,且取消这个操作只影响到了t
这个对象,这意味着只取消了read_queue.get
这个动作,而其他的协程/动作都还在正常的执行着,包括poeter
对象。
所以poeter
对象还是能正常的从write_queue
队列中获取数据再推到read_queue
队列中,这也就导致了worker(2)
和worker(3)
虽然是正常执行的推入的是2
和3
,但是他们分别得到的值却是1
和2
。
通过上述的分析可以看出,Asyncio
的取消操作是属于某一对象的方法,这意味着取消操作只能取消并中断该方法对应的Asyncio.Task
对象执行,不能取消其他Asyncio.Task
对象的操作,此外,取消只能管理某个动作,不会去影响到动作相关的资源。
同时要知道的是由于Asyncio
的机制是任何协程都能互相影响,也就是A协程可以取消B协程,这意味着开发者在业务代码对某个协程进行取消时,取消是可以被传递的。
现在再回到redis-py
这个库本身,由于redis-py
本身没有用到取消操作相关的语法,于是没有考虑到取消操作这件事,但是业务代码中针对某一Asyncio.Task
对象执行取消操作时,取消这个动作影响会到了redis-py
库的方法,最终导致这个问题出现。
Python.Asyncio
的任何协程都能互相影响的机制解决了协程无法被管理导致的内存溢出等问题,但这也间接的导致了库的开发者需要注意到取消机制的存在。
而与之相反的是Go
协程,由于它本身的特性,虽然B协程是A协程创造出来的,但是A协程没有办法影响到B协程,为了实现A协程能影响到B协程,需要显式的把context从A协程传到B协程中,然后由A协程通过context传递动作,B协程根据context的结果做出响应,如下代码:
1 |
|
在这个代码中main
函数可以认为是协程A,而demo
函数是协程B,在通过go demo
启动协程B后,协程B就无法再被管理,如果要管理协程B,就必须像示例代码一样让demo
函数支持外部传入context变量,同时内部会对context变量的状态进行监听并做出响应的操作。
这就意味着新启动的协程能支持协程的哪些功能,全由这个协程的实现者决定,这样即使redis-py
没有考虑到取消的功能,也并不会被用户的业务代码影响到,但反过来用户却没办法使用到取消功能来影响redis-py
的功能,除非开发者有意去添加取消功能。
通过对比发现,为了功能完备,无论使用Go
还是Python
都需要开发者去针对取消机制去进行处理,如果开发者没有去处理的话,在Go
中是无法使用取消功能,而Python
则是会引发一些问题,这种情况下Go
的机制会好一些,不过对于结构化并发和超时等功能,Go
都需要库开发者去显式的支持,而Python
却不用,这一点我会觉得Python
的实现更好一些,当然,哪种实现更好还是取决于使用者本人。
3.使用取消需要考虑的问题
通过ChatGPT的问题可以知道在使用Asyncio
时没有考虑到取消这个场景时会导致一些问题的发生,然而在使用取消机制时也有很多问题需要考虑的。
3.1.取消无法取消已经完成的Asyncio.Task
对象
还记得一开始的例子代码中的一段asyncio.sleep(0)
的代码吗?这里是特意修改为0从而确保这段代码能触发Bug,如果休眠时间变长,则不一定能触发,为了能更好的理解,这里使用转换后的例子代码来演示,并把asyncio.sleep(0)
改为asyncio.sleep(0.1)
,代码如下:
1 |
|
运行代码后就可以看到终端会有如下输出:
1 |
|
通过输出结果可以发现,t
对象并没有被取消,后面的worker
对象执行也是正常的了。这是因为取消这个动作执行的时候t
对象已经执行完毕了,而取消只能取消处于等待过程的Asyncio.Task
对象,所以在这个例子中的取消操作是没有执行成功的。
为了能更好的了解这个机制,可以通过如下代码进行验证:
1 |
|
通过运行该代码后,可以发现第一次打印t.done
是False
,而第二次打印为True
。其实在可等待对象的原理 中介绍了Task
是协程的载体,而Task
继承于Future
,所以通过Future
的源码也可以知道Cancel
方法仅当协程为等待状态(PENDING
)时生效,而且他的工作只是把协程状态改为CANCELLED
状态(Asyncio.Task
对象会复杂一点):
1 |
|
3.2.取消是一个异步操作
取消是一个异步操作,这意味着取消这个操作可能需要一段时间才能执行完毕。
如果是第一次接触示例代码的话,那么一定会好奇为何一定要有asyncio.sleep(0)
这个语句,才能百分百复现Bug?为了验证这个问题,先编写一个简单的示例,代码如下:
1 |
|
在运行该示例时可以发现有如下输出:
1 |
|
可以发现demo
函数的第一句就已经没有执行了, 这是因为没有asyncio.sleep(0)
来让出控制权,事件循环就无法去调度t
对象去启动,这时即使针对t
对象执行了取消操作,但由于t
对象没有运行,那么此时的操作只能是给t
对象打个标记,告诉t
对象已经是取消状态了,然后等到调用await t
主动去激活t
对象后,t
对象就会开始运行,但是在启动后发现自己已经被取消了,就不会去执行demo
函数,而是直接抛错了。
这个例子解释了加上asyncio.sleep(0)
这个语句的必要性,但是示例代码中已经加上了asyncio.sleep(0)
语句了,后面为啥还要使用await t
呢?这是因为执行取消操作时会取消当前t
对象正在执行的操作,然后逐步把异常向上抛,但是内部可以由开发者捕获取消异常后执行一些等待操作(如IO操作),此时t
对象就无法马上收到取消异常,这意味着取消动作还未结束。
当IO操作执行完后,会继续向上抛异常,然后t
对象在接收到取消异常后会转为完成状态,此时取消操作也就完成了。
如下是模拟取消操作被堵住的代码:
1 |
|
在执行完成的时候,控制台有如下输出:
1 |
|
通过输出可以看到,第一次打印取消状态和休眠0.5秒后第二次打印的取消状态都是一样的,表示协程尚未被取消,但在执行await t
后,就打印了协程已经被取消了,且可以发现从取消协程开始到结束所花费的时间正好是2秒左右,与demo
函数捕获取消异常后休眠2秒一致。
3.3.取消异常的处理
从上面可以的分析可以知道,取消操作是取消某一段执行,然后向上抛异常,当Asyncio.Task
捕获到异常后,会标记AsyncioTask
对象的状态为取消。那如果向Asyncio.Task
对象执行取消操作,但是取消的异常被屏蔽了,会如何处理呢?下面是一个基于3.1中示例代码修改的例子:
1 |
|
运行代码后可以发现输出如下:
1 |
|
通过取消标记可以发现取消操作已经正确执行了,但是由于取消异常没有被向上抛出,导致t
对象无法捕获到取消异常,从而认为t
对象的取消状态为False
。所以如果不是有意对取消异常做特别处理(比如超时的实现是把取消异常转为超时异常抛出),那么在捕获取消异常进行处理后仍然需要抛出取消异常。
4.取消机制的实现
前面写了取消机制的几种情况,都是通过源码分析出来的,因为官方文档并没有明显的说明,所以要用好取消机制,最好还是过一遍相关的源码,好在跟取消机制相关的地方不多,只跟Asyncio.Task
对象和Asyncio
事件循环的调度有关。
- 1.
Asyncio.Future
也跟取消机制有关,不过非常简单,这里就不说了。- 2.本文只介绍跟取消操作相关的,如果要了解
Asyncio.Task
对象的具体原理,详见Python的可等待对象在Asyncio的作用,如果要了解Asyncio
的调度原理,详见Python Asyncio调度原理
首先是看Task
对象,Task
对象跟取消机制相关代码如下(uncancel
相关代码也移除了):
1 |
|
在阅读源码之前,先简单的过一下Asyncio.Task
的原理,在对Asyncio.Task
进行初始化时,会调用__init__
方法进行初始化,然后需要用户通过await
调用初始化好的Asynio.Task
对象时,才会触发_step
方法,_step
方法则是通过生成器的send
方法来驱动协程的执行,如果协程内部有非Asyncio.Task
可等待对象,则会把他们赋值到self._fut_waiter
对象中,并通过loop.call_xxx
系列函数安排事件循环在下次有空且self._fut_Waiter
对象准备好时再来调用_step
方法,然后就这样一直循环反复,直到执行结束或者遇到异常为止,如下面的代码(执行逻辑见注释):
1 |
|
这段代码中,第一部分是__init__
方法,这个方法很简单,它会初始化self._must_cancel
为False
,以及self._fut_waiter
为空。
第二部分则是取消操作对应的cancel
方法,它会先判断当前Asyncio.Task
是否已经执行完毕,如果执行完毕就会直接退出,不执行cancel
的剩下操作,这里解释了3.1标题中说的《取消无法取消已经完成的Asyncio.Task
对象》。
如果Asyncio.Task
没有执行完毕则会判断当前的self._fut_waiter
对象是否为空,如果不为空,则直接取消self._fut_waiter
对象,并返回。如果为空则设置self._must_cancel
为True
。
第三部分是_step
方法,该方法如果发现self._must_cancel
为True
,且没有异常或者异常不属于取消异常时会生成一份默认的取消异常,然后就直接到了coro.throw(exc)
这行代码,直接告诉被Asyncio.Task
托管的Coro出现了取消异常,然后Coro会抛出对应的取消异常并被Asyncio.Task
捕获并退出。这时如果Coro
并没有通过coro.send
预激(也就是第一次调用_step
方法时直接命中了coro.throw
方法),则Coro
不会被运行,此时对应的是3.2.取消是一个异步操作中的第一个示例代码的场景。
然而如果Coro
还有await
语法没走完就会进入到没异常的else
语块中,该语块会向result
注册一个回调,让result
完成时来调用自己的__wakeup
方法,如果result
抛出异常就会把异常传入_step
方法中。现在循环就又回到了_step
方法了, 而这次不属于第一次运行_step
方法了,此时执行了coro.throw(exc)
这行代码,那么就会使Coro
中的对应的await
语句抛出取消异常,然后交由Asyncio.Task
捕获取消异常并返回。
_step
方法入口时就把self._must_cancel
设置为Fasle
了,而else
语块还要判断self._must_cancel
为True
的情况,这里应该是考虑了线程不安全的问题,但是我没有去验证。
可以发现第二部分能让开发者主动诱发self._fut_waiter
对象抛出取消异常,而__wakeup
能监听self._fut_waiter
的取消异常,从而传入_step
方法中,最终让Asyncio.Task
的状态变为取消状态,这是一个闭环。
如果这一环中有有一处代码捕获到取消异常并不再抛出取消异常,那么Asyncio.Task
就无法变为取消状态,而是以正常的步骤继续调用_step
方法。
5.总结
可以发现Asyncio
的取消机制还是有一点点复杂,作为库的开发者,需要无时无刻考虑到如果自己编写的代码遇到取消异常时,需要怎么去妥善处理(其实Python
自己实现的Asyncio
同步原语或者高级API也被取消异常坑过)。
而作为使用者,也需要知道取消异常的使用局限性和使用方法。
- 本文作者:So1n
- 本文链接:http://so1n.me/2023/03/25/python_asyncio_lib_cancel/index.html
- 版权声明:本博客所有文章均采用 BY-NC-SA 许可协议,转载请注明出处!