def__enter__(self) -> "Deadline": # 进入with语句范围 if self._with_scope_future: # 一个实例同时只能调用一次, 多次调用会出错 raise RuntimeError("`with` can only be called once") if self._delay isnotNone: # 启动了超时机制
# 获取当前运行的task main_task: Optional[asyncio.Task] = asyncio.Task.current_task(self._loop) ifnot main_task: raise RuntimeError("Can not found current task") # 注册with语句所在的future self._with_scope_future = main_task return self
asyncdef__aenter__(self): # 限制只能调用一次 if self._entered: raise RuntimeError( f"TaskGroup {self!r} has been already entered") self._entered = True
if self._loop isNone: self._loop = events.get_running_loop()
# 获取当前的task self._parent_task = tasks.current_task(self._loop) if self._parent_task isNone: raise RuntimeError( f'TaskGroup {self!r} cannot determine the parent task')
return self
asyncdef__aexit__(self, et, exc, tb): self._exiting = True propagate_cancellation_error = None
if (exc isnotNoneand self._is_base_error(exc) and self._base_error isNone): self._base_error = exc
if et isnotNone: if et is exceptions.CancelledError: if self._parent_cancel_requested andnot self._parent_task.uncancel(): # Do nothing, i.e. swallow the error. pass else: # 如果有一个协程已经取消了,就设置取消的exc propagate_cancellation_error = exc
ifnot self._aborting: # 取消所有的task self._abort()
# 如果还有派生的协程来运行,就陷在这个逻辑中 while self._tasks: if self._on_completed_fut isNone: self._on_completed_fut = self._loop.create_future()
try: # 创建一个中间future来捕获所有派生协程的异常,并等待协程运行完毕 await self._on_completed_fut except exceptions.CancelledError as ex: # TaskGroup不会使_on_completed_fut抛出取消异常,但是如果main_task被取消时,会传播到_on_completed_fut ifnot self._aborting: # 与上面一样设置错误,并取消所有协程 propagate_cancellation_error = ex self._abort()
self._on_completed_fut = None
assertnot self._tasks
# 如果有异常,则抛出 if self._base_error isnotNone: raise self._base_error
if propagate_cancellation_error isnotNone: raise propagate_cancellation_error
if et isnotNoneand et isnot exceptions.CancelledError: self._errors.append(exc)
# 抛出所有运行期间的异常 if self._errors: errors = self._errors self._errors = None
me = BaseExceptionGroup('unhandled errors in a TaskGroup', errors) raise me fromNone
defcreate_task(self, coro, *, name=None, context=None): # 判断目前是否生效,如果不生效就无法派生协程 ifnot self._entered: raise RuntimeError(f"TaskGroup {self!r} has not been entered") if self._exiting andnot self._tasks: raise RuntimeError(f"TaskGroup {self!r} is finished") # 通过事件循环创建协程 if context isNone: task = self._loop.create_task(coro) else: task = self._loop.create_task(coro, context=context) tasks._set_task_name(task, name) # 添加task执行结果回调 task.add_done_callback(self._on_task_done) # 把task添加到对应的self._task,这样其它方法就会判断协程是否运行完毕了 self._tasks.add(task) return task
if self._on_completed_fut isnotNoneandnot self._tasks: ifnot self._on_completed_fut.done(): # 如果最后一个派生的协程运行结束,则设置中间future,这样TaskGroup.__aexit__的while循环就能继续执行了 self._on_completed_fut.set_result(True)
# 如果task已经取消或者没有异常,则不走下面的逻辑 if task.cancelled(): return exc = task.exception() if exc isNone: return
# 把异常添加到类中 self._errors.append(exc) if self._is_base_error(exc) and self._base_error isNone: self._base_error = exc
# 最后处理下当前task if self._parent_task.done(): # Not sure if this case is possible, but we want to handle # it anyways. self._loop.call_exception_handler({ 'message': f'Task {task!r} has errored out but its parent ' f'task {self._parent_task} is already completed', 'exception': exc, 'task': task, }) return ifnot self._aborting andnot self._parent_cancel_requested: self._abort() self._parent_cancel_requested = True self._parent_task.cancel()