前记 在文章《Python的可等待对象在Asyncio的作用》 中介绍了Python
的可等待对象作用,特别是Task
对象在启动的时候可以自我驱动,但是一个Task
对象只能驱动一条执行链,如果要多条链执行(并发),还是需要EventLoop
来安排驱动,接下来将通过Python.Asyncio
库的源码来了解EventLoop
是如何运作的。
1.基本介绍 Python.Asyncio
是一个大而全的库,它包括很多功能,而跟核心调度相关的逻辑除了三种可等待对象外,还有其它一些功能,它们分别位于runners.py
,base_event.py
,event.py
三个文件中。
runners.py
文件有一个主要的类–Runner
,它的主要职责是做好进入协程模式的事件循环等到初始化工作,以及在退出协程模式时清理还在内存的协程,生成器等对象。
协程模式只是为了能方便理解,对于计算机而言,并没有这样区分
event.py
文件除了存放着EventLoop
对象的接口以及获取和设置EventLoop
的函数外,还有两个EventLoop
可调度的对象,分别为Handler
和TimerHandler
,它们可以认为是EvnetLoop
调用其它对象的容器,用于连接待调度对象和事件循环的关系,不过它们的实现非常简单,对于Handler
,它的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class Handle : def __init__ (self, callback, args, loop, context=None ): if context is None : context = contextvars.copy_context() self._context = context self._loop = loop self._callback = callback self._args = args self._cancelled = False def cancel (self ): if not self._cancelled: self._cancelled = True self._callback = None self._args = None def cancelled (self ): return self._cancelled def _run (self ): try : self._context.run(self._callback, *self._args) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: cb = format_helpers._format_callback_source( self._callback, self._args) msg = f'Exception in callback {cb} ' context = { 'message' : msg, 'exception' : exc, 'handle' : self, } self._loop.call_exception_handler(context)
通过源码可以发现,Handle
功能十分简单,提供了可以被取消以及可以在自己所处的上下文执行的功能,而TimerHandle
继承于Handle
比Handle
多了一些和时间以及排序相关的参数,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class TimerHandle (Handle ): def __init__ (self, when, callback, args, loop, context=None ): super ().__init__(callback, args, loop, context) self._when = when self._scheduled = False def __hash__ (self ): return hash (self._when) def __lt__ (self, other ): if isinstance (other, TimerHandle): return self._when < other._when return NotImplemented def __le__ (self, other ): if isinstance (other, TimerHandle): return self._when < other._when or self.__eq__(other) return NotImplemented def __gt__ (self, other ): if isinstance (other, TimerHandle): return self._when > other._when return NotImplemented def __ge__ (self, other ): if isinstance (other, TimerHandle): return self._when > other._when or self.__eq__(other) return NotImplemented def __eq__ (self, other ): if isinstance (other, TimerHandle): return (self._when == other._when and self._callback == other._callback and self._args == other._args and self._cancelled == other._cancelled) return NotImplemented def cancel (self ): if not self._cancelled: self._loop._timer_handle_cancelled(self) super ().cancel() def when (self ): return self._when
通过代码可以发现,这两个对象十分简单,而我们在使用Python.Asyncio
时并不会直接使用到这两个对象,而是通过loop.call_xxx
系列方法来把调用封装成Handle
对象,然后等待EventLoop
执行。 所以loop.call_xxx
系列方法可以认为是EventLoop
的注册操作,基本上所有非IO的异步操作都需要通过loop.call_xxx
方法来把自己的调用注册到EventLoop
中,比如Task
对象就在初始化后通过调用loop.call_soon
方法来注册到EventLoop
中,loop.call_sonn
的实现很简单,它的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class BaseEventLoop : ... def call_soon (self, callback, *args, context=None ): self._check_closed() handle = self._call_soon(callback, args, context) return handle def _call_soon (self, callback, args, context ): handle = events.Handle(callback, args, self, context) self._ready.append(handle) return handle
可以看到call_soon
真正相关的代码只有10几行,它负责把一个调用封装成一个Handle
,并添加到self._reday
中,从而实现把调用注册到事件循环之中。
loop.call_xxx
系列函数除了loop.call_soon
系列函数外,还有另外两个方法–loop.call_at
和loop.call_later
,它们类似于loop.call_soon
,不过多了一个时间参数,来告诉EventLoop
在什么时间后才可以调用,同时通过loop.call_at
和loop.call_later
注册的调用会通过Python
的堆排序模块headpq
注册到self._scheduled
变量中,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class BaseEventLoop : ... def call_later (self, delay, callback, *args, context=None ): if delay is None : raise TypeError('delay must not be None' ) timer = self.call_at(self.time() + delay, callback, *args, context=context) return timer def call_at (self, when, callback, *args, context=None ): if when is None : raise TypeError("when cannot be None" ) self._check_closed() timer = events.TimerHandle(when, callback, args, self, context) heapq.heappush(self._scheduled, timer) timer._scheduled = True return timer
2.EventLoop的调度实现 在文章《Python的可等待对象在Asyncio的作用》 中已经分析到了runner
会通过loop.run_until_complete
来调用main
Task从而开启EventLoop
的调度,所以在分析EventLoop
的调度时,应该先从loop.run_until_complete
入手,对应的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class BaseEventLoop : def run_until_complete (self, future ): ... new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) try : self.run_forever() except : if new_task and future.done() and not future.cancelled(): future.exception() raise finally : future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.' ) return future.result() def run_forever (self ): self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try : events._set_running_loop(self) while True : self._run_once() if self._stopping: break finally : self._stopping = False self._thread_id = None events._set_running_loop(None ) self._set_coroutine_origin_tracking(False ) sys.set_asyncgen_hooks(*old_agen_hooks)
这段源码并不复杂,它的主要逻辑是通过把Corotinue
转为一个Task
对象,然后通过Task
对象初始化时调用loop.call_sonn
方法把自己注册到EventLoop
中,最后再通过loop.run_forever
中的循环代码一直运行着,直到_stopping
被标记为True
:
1 2 3 4 5 while True : self._run_once() if self._stopping: break
可以看出,这段代码是确保事件循环能一直执行着,自动循环结束,而真正调度的核心是_run_once
函数,它的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 class BaseEventLoop : ... def _run_once (self ): sched_count = len (self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else : new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else : while self._scheduled and self._scheduled[0 ]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 elif self._scheduled: when = self._scheduled[0 ]._when timeout = min (max (0 , when - self.time()), MAXIMUM_SELECT_TIMEOUT) event_list = self._selector.select(timeout) self._process_events(event_list) end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0 ] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) ntodo = len (self._ready) for i in range (ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try : self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds' , _format_handle(handle), dt) finally : self._current_handle = None else : handle._run() handle = None
通过源码分析,可以很明确的知道调度逻辑中第一步是先规整self._scheduled
,在规整的过程是使用堆排序来进行的,因为堆排序在调度的场景下效率是非常高的,不过这段规整代码分成两种,我猜测是当需要取消的数量过多时直接遍历的效率会更高。 在规整self._scheduled
后,就进入第二步,该步骤开始等待系统事件循环返回对应的事件,如果self._ready
中有数据,就不做等待了,需要马上到下一步骤,以便能赶紧安排调度。 在得到系统事件循环得到的事件后,就进入到了第三步,该步骤会通过self._process_events
方法处理对应的事件,并把事件对应的回调存放到了self._ready
中,最后再遍历self._ready
中的所有Handle
并逐一执行(执行时可以认为EventLoop
把控制权返回给对应的调用逻辑),至此一个完整的调度逻辑就结束了,并进入下一个调度逻辑。
3.网络IO事件的处理
注:由于系统事件循环的限制,所以文件IO一般还是使用多线程来执行,具体见:https://github.com/python/asyncio/wiki/ThirdParty#filesystem
在分析EventLoop
调度实现的时候忽略了self._process_events
的具体实现逻辑,因为_process_events
方法所在asyncio.base_event.py
文件中的BaseEventLoop
类并未有具体实现的,因为网络IO相关的需要系统的事件循环来帮忙处理,所以与系统事件循环相关的逻辑都在asyncio.selector_events.py
中的BaseSelectorEventLoop
类中。BaseSelectorEventLoop
类封装了selector
模块与系统事件循环交互,使调用者不需要去考虑sock的创建以及sock产生的文件描述符的监听与注销等操作,下面以BaseSelectorEventLoop
中自带的pipe为例子,分析BaseSelectorEventLoop
是如何进行网络IO事件处理的。
在分析之前,先看一个例子,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioimport threadingdef task (): print("task" )def run_loop_inside_thread (loop ): loop.run_forever() loop = asyncio.get_event_loop() threading.Thread(target=run_loop_inside_thread, args=(loop,)).start() loop.call_soon(task)
如果直接运行这个例子,它并不会输出task
(不过在IDE使用DEBUG模式下线程启动会慢一点,所以会输出的),因为在调用loop.run_forever
后EventLoop
会一直卡在这段逻辑中:
1 event_list = self._selector.select(timeout)
所以调用loop.call_soon
并不会使EventLoop
马上安排调度,而如果把call_soon
换成call_soon_threadsafe
则可以正常输出,这是因为call_soon_threadsafe
中多了一个self._write_to_self
的调用,它的源码如下:
1 2 3 4 5 6 7 8 9 class BaseEventLoop : ... def call_soon_threadsafe (self, callback, *args, context=None ): """Like call_soon(), but thread-safe.""" self._check_closed() handle = self._call_soon(callback, args, context) self._write_to_self() return handle
由于这个调用是涉及到IO相关的,所以需要到BaseSelectorEventLoop
类查看,接下来以pipe相关的网络IO操作来分析EventLoop
是如何处理IO事件的(只演示reader对象,writer对象操作与reader类似),对应的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 class BaseSelectorEventLoop (base_events.BaseEventLoop ): def __init__ (self, selector=None ): super ().__init__() if selector is None : selector = selectors.DefaultSelector() self._selector = selector self._make_self_pipe() self._transports = weakref.WeakValueDictionary() def _make_self_pipe (self ): self._ssock, self._csock = socket.socketpair() self._ssock.setblocking(False ) self._csock.setblocking(False ) self._internal_fds += 1 self._add_reader(self._ssock.fileno(), self._read_from_self) def _add_reader (self, fd, callback, *args ): self._check_closed() handle = events.Handle(callback, args, self, None ) try : key = self._selector.get_key(fd) except KeyError: self._selector.register(fd, selectors.EVENT_READ, (handle, None )) else : mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_READ, (handle, writer)) if reader is not None : reader.cancel() return handle def _read_from_self (self ): while True : try : data = self._ssock.recv(4096 ) if not data: break self._process_self_data(data) except InterruptedError: continue except BlockingIOError: break def _close_self_pipe (self ): self._remove_reader(self._ssock.fileno()) self._ssock.close() self._ssock = None self._csock.close() self._csock = None self._internal_fds -= 1 def _remove_reader (self, fd ): if self.is_closed(): return False try : key = self._selector.get_key(fd) except KeyError: return False else : mask, (reader, writer) = key.events, key.data mask &= ~selectors.EVENT_READ if not mask: self._selector.unregister(fd) else : self._selector.modify(fd, mask, (None , writer)) if reader is not None : reader.cancel() return True else : return False
通过源码中的创建部分可以看到,EventLoop
在启动的时候会创建一对建立通信的sock,并设置为非阻塞,然后把对应的回调封装成一个Handle
对象并注册到系统事件循环中(删除则进行对应的反向操作),之后系统事件循环就会一直监听对应的事件,也就是EventLoop
的执行逻辑会阻塞在下面的调用中,等待事件响应:
1 event_list = self._selector.select(timeout)
这时如果执行loop.call_soon_threadsafe
,那么会通过write_to_self
写入一点信息:
1 2 3 4 5 6 7 8 9 10 def _write_to_self (self ): csock = self._csock if csock is None : return try : csock.send(b'\0' ) except OSError: if self._debug: logger.debug("Fail to write a null byte into the self-pipe socket" , exc_info=True )
由于csock
被写入了数据,那么它对应的ssock
就会收到一个读事件,系统事件循环在收到这个事件通知后就会把数据返回,然后EventLoop
就会获得到对应的数据,并交给process_events
方法进行处理,它的相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class BaseSelectorEventLoop : def _process_events (self, event_list ): for key, mask in event_list: fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None : if reader._cancelled: self._remove_reader(fileobj) else : self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None : if writer._cancelled: self._remove_writer(fileobj) else : self._add_callback(writer) def _add_callback (self, handle ): assert isinstance (handle, events.Handle), 'A Handle is required here' if handle._cancelled: return assert not isinstance (handle, events.TimerHandle) self._ready.append(handle) def _remove_reader (self, fd ): if self.is_closed(): return False try : key = self._selector.get_key(fd) except KeyError: return False else : mask, (reader, writer) = key.events, key.data mask &= ~selectors.EVENT_READ if not mask: self._selector.unregister(fd) else : self._selector.modify(fd, mask, (None , writer)) if reader is not None : reader.cancel() return True else : return False
从代码中可以看出_process_events
会对事件对应的文件描述符进行处理,并从事件回调中获取到对应的Handle
对象添加到self._ready
中,由EventLoop
在接下来遍历self._ready
并执行。
可以看到网络IO事件的处理并不复杂,因为系统事件循环已经为我们做了很多工作了,但是用户所有与网络IO相关的操作都需要有一个类似的操作,这样是非常的繁琐的,幸好asyncio
库已经为我们做了封装,我们只要调用就可以了,方便了很多。