【 基于 io_uring 的 C++20 协程网络库】11 停止机制补完
-
在第 01 篇里,我留了两个 stop 相关的问题。
第一个是:
stop()调了,但run()可能还卡在submit_and_wait里出不来。
第二个是:就算run()退了,内核里可能还有已提交但未完成的操作。第一个问题在第 02 篇已经用
eventfd解决了。这篇只聊第二个:怎么把停机过程做成可验证的先取消、再排干、最后退出。1. 强行退出的Pending状态与资源泄漏隐患
目前
run()的退出条件为:while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0)当
should_stop_置为 true 时,循环会立刻终止。但这时协程可能还挂在read、accept、sleep_for上,对应 SQE 已交给内核,CQE 还没回来。如果事件循环直接退出,
io_uring被析构后,用户态后续完成路径就可能拿到历史的user_data并把它当作Operation*解引用。只要这个对象已经销毁,就会变成悬垂指针风险。靠析构顺序刚好没出事不是一个可靠方案。2. 确定的停机序列:取消与排干(Draining)
正确的回收顺序是:退出前先把挂起操作都收回来。
所以
stop()触发后,不应该立刻跳出循环,而是要先向内核发取消请求,再等所有操作(包括被取消的操作)把 CQE 回完,最后再退出。io_uring提供了io_uring_prep_cancel接口。通过传入Operation*作为user_data,内核能够定位并中断对应的操作。随后,内核会投递一个结果为-ECANCELED的 CQE,触发该操作的complete()回调,使得挂起的协程能够沿着正常的异常或错误处理路径恢复。这样做的好处是,不管停机发生在什么时候,协程都还能沿正常路径恢复,栈帧能按预期展开和销毁。前提是:我们得准确知道当前还有多少活跃操作。
3. 方案取舍:全局取消的局限性与侵入式追踪
有一个更省代码的做法:提交一个特殊 Cancel SQE,带上
IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_ALL,让内核尽可能全量取消。这条路看起来很短,但我最后没选,原因有三个。
第一,状态不够确定。它是“尽力取消”,不是“给我精确活跃列表”。命令发出去后,用户态依然不知道还有多少 CQE 没回来。
第二,语义太粗。像
timeout组合子这种场景,一个co_await可能拆成多个底层请求(业务 I/O + timer)。全量取消能快速收敛,但会把上层状态机语义一起打平。第三,也是最关键的:我真正要解决的是“
run()什么时候能安全退出”。没有精确活跃计数,退出条件就没法验证。tips:
- 这类 ANY/ALL 取消属于能力驱动特性,使用前要确认内核与 liburing 版本是否支持对应标志位语义。
- 取消结果本身也会通过 CQE 回来,可能出现“目标已完成/未命中”等结果码,这些都意味着它不适合作为唯一退出判据。
- 在我的实现里,它可以作为“加速收敛”的辅助工具,但不能替代用户态的活跃操作追踪。
所以我还是选了侵入式链表。链表不空,说明还有操作在等 CQE;链表清空,才允许退出。
每个 awaiter 本来就继承自
Operation,把prev/next放进基类,插入删除都是 O(1),而且不需要额外分配。4. 零开销追踪:侵入式链表实现
实现上我做得很直接:把侵入式指针放进
Operation基类。struct Operation { Operation* prev{ nullptr }; Operation* next{ nullptr }; bool is_canceling_{ false }; virtual ~Operation() = default; virtual void complete(int res, std::uint32_t flags) noexcept = 0; };IOContext中维护链表指针及活跃操作计数:Operation* head_{ nullptr }; Operation* tail_{ nullptr }; std::size_t tracking_operations_{ 0 };track()和untrack()分别负责挂入、摘除,同时更新计数:void IOContext::track(gsl::not_null<Operation*> operation) noexcept { if (!head_) { head_ = tail_ = operation; } else { tail_->next = operation; operation->prev = tail_; tail_ = operation; } add_work(); } void IOContext::untrack(gsl::not_null<Operation*> operation) noexcept { // ... 链表节点摘除逻辑 ... operation->prev = operation->next = nullptr; drop_work(); }5. 生命周期绑定:在 Awaiter 中管理注册
以
ReadSomeAwaiter为例,我这里遵循一个很朴素的约定:await_suspend里注册,complete里注销。void ReadSomeAwaiter::await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); context().track(this); // 提交 SQE 后显式声明生命周期开始 } void ReadSomeAwaiter::complete(int result, std::uint32_t flags) noexcept { context().untrack(this); // CQE 返回后第一优先注销生命周期 set_result(result, flags); if (handle_) { handle_.resume(); } }其他底层 awaiter 也都按这个模式来,这样挂起操作和链表节点始终是一对一关系。
写路径里有个值得单独提一下的特例:
WriteAllAwaiter。它和
WriteSomeAwaiter不一样,不是一次 SQE 就结束,而是“分段发送直到 buffer 清空”的循环模型。也正因为这样,track/untrack的时机要更小心:每一轮complete()先untrack(this),再决定是resume还是继续arm_write()。void WriteAllAwaiter::complete(int result, std::uint32_t flags) noexcept { context_.untrack(this); set_result(result, flags); if (is_canceling_ || error_code_ != 0 || buffer_.empty()) { if (is_canceling_ && error_code_ == 0) error_code_ = ECANCELED; resume(handle_, result, flags); } else { arm_write(); } }这里还有一个细节:取消路径里,如果当前轮没有带出错误码,会主动归一到
ECANCELED。这样上层就不会看到“已经取消但结果像成功”的歧义状态。6. 复杂状态机处理:超时组合子与
sqe()接口收敛普通 awaiter 基本是一对一:一个
Operation对一个 SQE。到了 timeout 组合逻辑,就会出现多 CQE 的竞争。第一种是
TimeoutAwaiter。它用IOSQE_IO_LINK把业务操作和超时请求绑在一起。虽然内核会回两个 CQE,但它们共享同一个Operation节点,所以只需要追踪一次,等两个 CQE 都回来再摘除:void await_suspend(std::coroutine_handle<> handle) noexcept { // 发送两个 linked SQE,但在业务视角仅追踪一个逻辑单元 context().track(this); } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (--pending_cqes_ == 0) { context().untrack(this); // 确保底层事件全部终结 handle_.resume(); } }第二种是
TimeoutCombinator。这条路是解耦设计:业务操作和计时器作为两个独立Operation提交。void await_suspend(std::coroutine_handle<> handle) noexcept { auto* sqe = context().sqe(); ::io_uring_prep_timeout(sqe, &timeout_, 0, 0); ::io_uring_sqe_set_data(sqe, &timer_); context().track(&timer_); // 追踪独立的计时器节点 awaiter_.await_suspend(handle); // 内部 Awaiter 自行管理追踪 }在竞速过程中,谁先完成,就调用
context().cancel()去中断另一方。这样 timeout 组合子和全局停机路径就复用了同一套取消逻辑,也把旧版sqe(false)这种隐式开关顺带收掉了。这其实是一个我早就想动的点了:
sqe(bool tracking = true)。这个参数最初是我为了赶进度加的。当时想法很直接:业务请求默认
sqe(true),取消请求写sqe(false),先把功能跑通。但写完我就觉得不舒服。调用点里一个裸
bool,语义全靠人脑补,false到底是“不计数”还是“这是取消请求”,每次都得回到定义处确认。这是一种典型的bad smell了。旧接口是这样的:
auto sqe(bool tracking = true) -> ::io_uring_sqe*; // 取消路径 auto* sqe = context().sqe(false); ::io_uring_prep_cancel(sqe, &timer_, 0); ::io_uring_sqe_set_data(sqe, nullptr);这次停机重构正好给了我一个机会,把这个开关彻底拿掉:
sqe()只负责“拿一个 SQE”;计数交给track/untrack;取消统一走cancel()。auto sqe() -> ::io_uring_sqe*; // 取消路径 context().cancel(&timer_);拆完之后,接口职责就清楚了:
sqe()只做资源分配。track/untrack只做生命周期计数。cancel()统一承载取消语义。
所以这次看起来是在修停机,其实也顺手把一处历史包袱清掉了。
sqe(bool)这个临时方案当时确实帮我快速推进了实现,但到了这个阶段,是时候干掉他了。7. 重构事件循环:排干后退出
事件循环的退出条件现在就一条:活跃计数归零。
void IOContext::run() { while (tracking_operations_ > 0) { if (should_stop_.load(std::memory_order_relaxed)) { auto* current = head_; while (current) { cancel(current); current = current->next; } } scheduler_.schedule(); } }cancel()负责防重复,并投递取消指令:void IOContext::cancel(gsl::not_null<Operation*> operation) noexcept { if (operation->is_canceling_) return; if (auto* sqe = scheduler_.sqe()) { ::io_uring_prep_cancel(sqe, operation, 0); ::io_uring_sqe_set_data(sqe, nullptr); // 显式丢弃取消动作本身的 CQE operation->is_canceling_ = true; } }到这里,
schedule()就不需要再返回“这一轮完成了多少”。计数和生命周期都在Operation自己的track/untrack路径里闭环。8. 完整的停机时序
停机流程如下:
[async::stop() 触发] | | 置 should_stop_ = true | eventfd 写入信号,中断 submit_and_wait 阻塞 v [run() 进入排干模式] | | 遍历侵入式链表 | 调用 io_uring_prep_cancel 派发中断指令 v [内核态收割] | | 中止 I/O 轮询 | 投递包含 -ECANCELED 的 CQE v [schedule() 消费 CQE] | | 触发 op->complete(-ECANCELED, 0) | 节点自我 untrack(),递减 tracking_operations_ | 协程带着 operation_canceled 错误流转并销毁 v [tracking_operations_ 归零] | | run() 循环安全终止 v [资源清理]通过这套机制,停机路径就变成了可验证流程:先唤醒、再取消、再排干、最后退出。在当前实现约束下,这能持续收敛悬垂指针风险,也让退出行为更可预期。