前记 Asyncio
的同步原语可以简化我们编写资源竞争的代码和规避资源竞争导致的Bug的出现。 但是由于协程的特性,在大部分业务代码中并不需要去考虑资源竞争的出现,导致Asyncio
同步原语被使用的频率比较低,但是如果想机遇Asyncio
编写框架则需要学习同步原语的使用。
0.基础 同步原语都是适用于某些条件下对某个资源的争夺,在代码中大部分的资源都是属于一个代码块,而Python
对于代码块的管理的最佳实践是使用with
语法,with
语法实际上是调用了一个类中的__enter__
和__exit__
方法,比如下面的代码:
1 2 3 4 5 6 7 8 9 10 class Demo (object ): def __enter__ (self ): return def __exit__ (self, exc_type, exc_val, exc_tb ): return with Demo(): pass
代码中的Demo
类实现了__enter__
和__exit__
方法后,就可以被with
语法调用,其中__enter__
方法是进入代码块执行的逻辑,__enxi__
方法是用于退出代码块(包括异常退出)的逻辑。这两个方法符合同步原语中对资源的争夺和释放,但是__enter__
和__exit__
两个方法都是不支持await
调用的,为了解决这个问题,Python
引入了async with
语法。
async with
语法和with
语法类似 ,我们只要编写一个拥有__aenter__
和__aexit__
方法的类,那么这个类就支持asyncio with
语法了,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioclass Demo (object ): async def __aenter__ (self ): return async def __aexit__ (self, exc_type, exc_val, exc_tb ): return async def main (): async with Demo(): pass asyncio.run(main())
其中,类中的__aenter__
方法是进入代码块时执行的方法,__aexit__
是退出代码块时执行的方法。
有了async with
语法的加持,asyncio
的同步原语使用起来会比较方便,所以asyncio
中对资源争夺的同步原语都会继承于_ContextManagerMixin
类:
1 2 3 4 5 6 7 8 9 class _ContextManagerMixin : async def __aenter__ (self ): await self.acquire() return None async def __aexit__ (self, exc_type, exc, tb ): self.release()
并实现了acquire
和release
方法,供__aenter__
和__aexit__
方法调用,同时我们在使用同步原语的时候尽量用到async with
语法防止忘记释放资源的占用。
1.Lock 由于协程的特性,在编写协程代码时基本上可以不考虑到锁的情况,但在一些情况下我们还是需要用到锁,并通过锁来维护并发时的数据安全性,如下例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import asyncio share_data = {}async def sub (i ): share_data[i] = i await asyncio.sleep(0 ) print(i, share_data[i] == i)async def sub_add (i ): share_data[i] = i + 1 await asyncio.sleep(0 ) print(i, share_data[i] == i + 1 )async def main (): task_list = [] for i in range (10 ): task_list.append(sub(i)) task_list.append(sub_add(i)) await asyncio.gather(*task_list)if __name__ == "__main__" : asyncio.run(main())
在这个例子中程序会并发的执行sub
和sub_add
函数,他们是由不同的asyncio.Task
驱动的,这意味着会出现这样一个场景。 当负责执行sub(1)
函数的asyncio.Task
在执行完share_data[i]=i
后就执行await asyncio.sleep(0)
从而主动让出控制权并交还给事件循环,等待事件循环的下一次调度。 不过事件循环不会空下来,而是马上安排下一个asyncio.Task
执行,此时会先执行到sub_add(1)
函数的share_data[i] = i + 1
,并同样的在执行到await asyncio.sleep(0)
的时候把控制权交会给事件循环。 这时候控制权会由事件循环转移给原先执行sub(1)
函数的asyncio.Task
,获取到控制权l后sub(1)
函数的逻辑会继续走,但由于share_data[i]
的数据已经被share_data[i] = i + 1
修改了,导致最后执行print
时,share_data[i]
的数据已经变为脏数据,而不是原本想要的数据了。
为了解决这个问题,我们可以使用asyncio.Lock
来解决资源的冲突,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asyncio share_data = {} lock_dict = {}async def sub (i ): async with lock_dict[i]: share_data[i] = i await asyncio.sleep(0 ) print(i, share_data[i] == i)async def sub_add (i ): async with lock_dict[i]: share_data[i] = i + 1 await asyncio.sleep(0 ) print(i, share_data[i] == i + 1 )async def main (): task_list = [] for i in range (10 ): lock_dict[i] = asyncio.Lock() task_list.append(sub(i)) task_list.append(sub_add(i)) await asyncio.gather(*task_list)if __name__ == "__main__" : asyncio.run(main())
从例子可以看到asyncio.Lock
的使用方法跟多线程的Lock
差不多,通过async with
语法来获取和释放锁,它的原理也很简单,主要做了如下几件事:
1.确保某一协程获取锁后的执行期间,别的协程在获取锁时需要一直等待,直到执行完成并释放锁。
2.当有协程持有锁的时候,其他协程必须等待,直到持有锁的协程释放了锁。
2.确保所有协程能够按照获取的顺序获取到锁。
这意味着需要有一个数据结构来维护当前持有锁的协程的和下一个获取锁协程的关系,同时也需要一个队列来维护多个获取锁的协程的唤醒顺序。
asyncio.Lock
跟其它asyncio
功能的用法一样,使用asyncio.Future
来同步协程之间锁的状态,使用deque
维护协程间的唤醒顺序,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 class Lockl (_ContextManagerMixin, mixins._LoopBoundMixin ): def __init__ (self ): self._waiters = None self._locked = False def locked (self ): return self._locked async def acquire (self ): if (not self._locked and (self._waiters is None or all (w.cancelled() for w in self._waiters))): self._locked = True return True if self._waiters is None : self._waiters = collections.deque() fut = self._get_loop().create_future() self._waiters.append(fut) try : try : await fut finally : self._waiters.remove(fut) except exceptions.CancelledError: if not self._locked: self._wake_up_first() raise self._locked = True return True def release (self ): if self._locked: self._locked = False self._wake_up_first() else : raise RuntimeError('Lock is not acquired.' ) def _wake_up_first (self ): if not self._waiters: return try : fut = next (iter (self._waiters)) except StopIteration: return if not fut.done(): fut.set_result(True )
通过源码可以知道,锁主要提供了获取和释放的功能,对于获取锁需要区分两种情况:
1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。
2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的asyncio.Future
,用来同步状态,并添加到deque
中。
而对于释放锁就比较简单,只要获取deque
中的第一个asyncio.Future
,并通过fut.set_result(True)
进行标记,使asyncio.Future
从peding
状态变为done
状态,这样一来,持有该asyncio.Future
的协程就能继续运行,从而持有锁。
不过需要注意源码中acquire
方法中对CancelledError
异常进行捕获,再唤醒下一个锁,这是为了解决acquire
方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock
编写一个超时锁时,最简单的实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 import asyncioclass TimeoutLock (asyncio.Lock ): def __init__ (self, timeout, *, loop=None ): self.timeout = timeout super ().__init__(loop=loop) async def acquire (self ) -> bool: return await asyncio.wait_for(super ().acquire(), self.timeout)
这份代码非常简单,他只需要在__init__
方法传入timeout
参数,并在acuiqre
方法中通过wait_for
来实现锁超时即可,现在假设wait_for
方法是一个无法传递协程cancel
的方法,且编写的acquire
没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLock
的acquire
方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 class TimeoutLock (asyncio.Lock ): def __init__ (self, timeout, *, loop=None ): self.timeout = timeout super ().__init__(loop=loop) async def acquire (self ) -> bool: try : return await asyncio.wait_for(super ().acquire(), self.timeout) except Exception: self._wake_up_first() raise
1.1.什么时候要使用锁 在async
中判断是否需要使用asyncio.Lock
很简单,就是判断这个资源会不会被多个协程使用,最简单的就是这个资源是否被多个协程使用,如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncio async def demo (): cache = {} async def main (): asyncio.create_task(demo()) asyncio.create_task(demo()) asyncio.run(main())
这段代码的资源是demo
函数的cache
,由于他归属于demo
函数,只能被demo
函数中的其他代码读写,后面只要确保demo
函数的逻辑只会被一个协程完整的执行,那么就不需要锁了,即使demo
函数被两个asyncio.Task
并发驱动。 但是如果demo
函数被改为如下代码就不一样了:
1 2 3 4 5 6 async def demo (): cache = {} async def _update (key, value ): cache[key] = value asyncio.create(_update(1 , 1 )) asyncio.create(_update(1 , 2 ))
这时候出现了两个asyncio.Task
并发驱动到修改cache
的_update
函数,会出现资源冲突的情况,这时候就需要锁了。
所以判断资源需不需要锁的规则很简单,就是这个资源是否被多个asyncio.Task
并发驱动,如果是则需要加锁。
2.Event asyncio.Event
也是一个简单的同步原语,但它跟asyncio.Lock
不一样,asyncio.Lock
是确保每个资源只能被一个协程操作,而asyncio.Event
是确保某个资源何时可以被协程操作,可以认为asyncio.Lock
锁的是资源,asyncio.Event
锁的是协程,所以asyncio.Event
并不需要acquire
来锁资源,release
释放资源,所以也用不到async with
语法。
asyncio.Event
的简单使用示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def sub (event: asyncio.Event ) -> None : await event.wait() print("I'm Done" )async def main () -> None : event = asyncio.Event() for _ in range (10 ): asyncio.create_task(sub(event)) await asyncio.sleep(1 ) event.set () asyncio.run(main())
在这个例子中会先创建10个asyncio.Task
来执行sub
函数,但是所有sub
函数都会在event.wait
处等待,直到main
函数中调用event.set
后,所有的sub
函数的event.wait
会放行,使sub
函数能继续执行。
可以看到asyncio.Event
功能比较简单,它的源码实现也很简单,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class Event (mixins._LoopBoundMixin ): def __init__ (self ): self._waiters = collections.deque() self._value = False def is_set (self ): return self._value def set (self ): if not self._value: self._value = True for fut in self._waiters: if not fut.done(): fut.set_result(True ) def clear (self ): self._value = False async def wait (self ): if self._value: return True fut = self._get_loop().create_future() self._waiters.append(fut) try : await fut return True finally : self._waiters.remove(fut)
通过源码可以看到wait
方法主要是创建了一个asyncio.Future
,并把它加入到deque
队列后就一直等待着,而set
方法被调用时会遍历整个deque
队列,并把处于peding
状态的asyncio.Future
设置为done
,这时其他在调用event.wait
方法的协程就会得到放行。
通过源码也可以看出,asyncio.Event
并没有继承于_ContextManagerMixin
,这是因为它锁的是协程,而不是资源。
asyncio.Event
的使用频率比asyncio.Lock
多许多,不过通常都会让asyncio.Event
和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用set
与asyncio.Event
结合,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioclass SetEvent (asyncio.Event ): def __init__ (self, *, loop=None ): self._set = set () super ().__init__(loop=loop) def add (self, value ): self._set.add(value) self.clear() def remove (self, value ): self._set.remove(value) if not self._set: self.set ()
这个SetEvent
结合了set
和SetEvent
的功能,当set
有数据的时候,会通过clear
方法使SetEvent
变为等待状态,而set
没数据的时候,会通过set
方法使SetEvent
变为无需等待的状态,所有调用wait
的协程都可以放行,通过这种结合,SetEvent
拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def mock_conn_io () -> None : await asyncio.sleep(1 )def conn_handle (set_event: SetEvent ): task: asyncio.Task = asyncio.create_task(mock_conn_io()) set_event.add(task) task.add_done_callback(lambda t: set_event.remove(t))async def main (): set_event: SetEvent = SetEvent() for _ in range (10 ): conn_handle(set_event) await asyncio.wait(set_event.wait(), timeout=9 ) asyncio.run(main())
在这个演示功能中,mock_conn_io
用于模拟服务器的连接正在处理中,而conn_handle
用于创建服务器连接,main
则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。
这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。
4.Condition
condition只做简单介绍
asyncio.Condition
是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioasync def task (condition, work_list ): await asyncio.sleep(1 ) work_list.append(33 ) print('Task sending notification...' ) async with condition: condition.notify()async def main (): condition = asyncio.Condition() work_list = list () print('Main waiting for data...' ) async with condition: _ = asyncio.create_task(task(condition, work_list)) await condition.wait() print(f'Got data: {work_list} ' ) asyncio.run(main())
在这个例子中可以看到,notify
和wait
方法只能在async with condition
中可以使用,如果没有在async with condition
中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioasync def task (work_list ): await asyncio.sleep(1 ) work_list.append(33 ) print('Task sending notification...' ) return async def main (): work_list = list () print('Main waiting for data...' ) _task = asyncio.create_task(task(work_list)) await _task print(f'Got data: {work_list} ' ) asyncio.run(main())
通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition
的写法却更有点Go
协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon
时需要先考虑到是否需要asyncio.Codition
?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditon
k。
5.Semaphore asyncio.Semaphore
–信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。
asyncio.Semaphore
可以认为是一个延缓触发的asyncio.Lock
,asyncio.Semaphore
内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`
1 2 3 4 5 6 7 8 9 10 import asyncioasync def main (): semaphore = asyncio.Semaphore(10 ): async with semaphore: pass asyncio.run(main())
示例中代码通过async with
来指明一个代码块(代码用pass
代替),这个代码块是被asyncio.Semaphore
管理的,每次协程在进入代码块时,asyncio.Semaphore
的内部计数器就会递减一,而离开代码块则asyncio.Semaphore
的内部计数器会递增一。 当有一个协程进入代码块时asyncio.Semaphore
发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。
asyncio.Semaphore
的源码如下,需要注意的是由于asyncio.Semaphore
是一个延缓的asyncio.Lock
,所以当调用一次release
后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire
方法中要通过一个while
循环来解决这个问题:`
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class Semaphore (_ContextManagerMixin, mixins._LoopBoundMixin ): def __init__ (self, value=1 ): if value < 0 : raise ValueError("Semaphore initial value must be >= 0" ) self._value = value self._waiters = collections.deque() self._wakeup_scheduled = False def _wake_up_next (self ): while self._waiters: waiter = self._waiters.popleft() if not waiter.done(): waiter.set_result(None ) self._wakeup_scheduled = True return def locked (self ): return self._value == 0 async def acquire (self ): while self._wakeup_scheduled or self._value <= 0 : fut = self._get_loop().create_future() self._waiters.append(fut) try : await fut self._wakeup_scheduled = False except exceptions.CancelledError: self._wake_up_next() raise self._value -= 1 return True def release (self ): self._value += 1 self._wake_up_next()
针对asyncio.Semaphore
进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import asyncioimport timefrom typing import Coroutineclass Pool (object ): def __init__ (self, max_concurrency: int ): self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency) async def create_task (self, coro: Coroutine ) -> asyncio.Task: await self._semaphore.acquire() task: asyncio.Task = asyncio.create_task(coro) task.add_done_callback(lambda t: self._semaphore.release()) return taskasync def demo (cnt: int ) -> None : print(f"{int (time.time())} create {cnt} task..." ) await asyncio.sleep(cnt)async def main () -> None : pool: Pool = Pool(3 ) for i in range (10 ): await pool.create_task(demo(i)) asyncio.run(main())