在第 01 篇里,我留了两个 stop 相关的问题。
第一个是:stop() 调了,但 run() 可能还卡在 submit_and_wait 里出不来。
第二个是:就算 run() 退了,内核里可能还有已提交但未完成的操作。
第一个问题在第 02 篇已经用 eventfd 解决了。这篇只聊第二个:怎么把停机过程做成可验证的先取消、再排干、最后退出。
1. 强行退出的Pending状态与资源泄漏隐患
目前 run() 的退出条件为:
while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0)
当 should_stop_ 置为 true 时,循环会立刻终止。但这时协程可能还挂在 read、accept、sleep_for 上,对应 SQE 已交给内核,CQE 还没回来。
如果事件循环直接退出,io_uring 被析构后,用户态后续完成路径就可能拿到历史的 user_data 并把它当作 Operation* 解引用。只要这个对象已经销毁,就会变成悬垂指针风险。靠析构顺序刚好没出事不是一个可靠方案。
2. 确定的停机序列:取消与排干(Draining)
正确的回收顺序是:退出前先把挂起操作都收回来。
所以 stop() 触发后,不应该立刻跳出循环,而是要先向内核发取消请求,再等所有操作(包括被取消的操作)把 CQE 回完,最后再退出。
io_uring 提供了 io_uring_prep_cancel 接口。通过传入 Operation* 作为 user_data,内核能够定位并中断对应的操作。随后,内核会投递一个结果为 -ECANCELED 的 CQE,触发该操作的 complete() 回调,使得挂起的协程能够沿着正常的异常或错误处理路径恢复。
这样做的好处是,不管停机发生在什么时候,协程都还能沿正常路径恢复,栈帧能按预期展开和销毁。前提是:我们得准确知道当前还有多少活跃操作。
3. 方案取舍:全局取消的局限性与侵入式追踪
有一个更省代码的做法:提交一个特殊 Cancel SQE,带上 IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_ALL,让内核尽可能全量取消。
这条路看起来很短,但我最后没选,原因有三个。
第一,状态不够确定。它是“尽力取消”,不是“给我精确活跃列表”。命令发出去后,用户态依然不知道还有多少 CQE 没回来。
第二,语义太粗。像 timeout 组合子这种场景,一个 co_await 可能拆成多个底层请求(业务 I/O + timer)。全量取消能快速收敛,但会把上层状态机语义一起打平。
第三,也是最关键的:我真正要解决的是“run() 什么时候能安全退出”。没有精确活跃计数,退出条件就没法验证。
tips:
- 这类 ANY/ALL 取消属于能力驱动特性,使用前要确认内核与 liburing 版本是否支持对应标志位语义。
- 取消结果本身也会通过 CQE 回来,可能出现“目标已完成/未命中”等结果码,这些都意味着它不适合作为唯一退出判据。
- 在我的实现里,它可以作为“加速收敛”的辅助工具,但不能替代用户态的活跃操作追踪。
所以我还是选了侵入式链表。链表不空,说明还有操作在等 CQE;链表清空,才允许退出。
每个 awaiter 本来就继承自 Operation,把 prev/next 放进基类,插入删除都是 O(1),而且不需要额外分配。
4. 零开销追踪:侵入式链表实现
实现上我做得很直接:把侵入式指针放进 Operation 基类。
struct Operation {
Operation* prev{ nullptr };
Operation* next{ nullptr };
bool is_canceling_{ false };
virtual ~Operation() = default;
virtual void complete(int res, std::uint32_t flags) noexcept = 0;
};
IOContext 中维护链表指针及活跃操作计数:
Operation* head_{ nullptr };
Operation* tail_{ nullptr };
std::size_t tracking_operations_{ 0 };
track() 和 untrack() 分别负责挂入、摘除,同时更新计数:
void IOContext::track(gsl::not_null<Operation*> operation) noexcept
{
if (!head_) {
head_ = tail_ = operation;
}
else {
tail_->next = operation;
operation->prev = tail_;
tail_ = operation;
}
add_work();
}
void IOContext::untrack(gsl::not_null<Operation*> operation) noexcept
{
// ... 链表节点摘除逻辑 ...
operation->prev = operation->next = nullptr;
drop_work();
}
5. 生命周期绑定:在 Awaiter 中管理注册
以 ReadSomeAwaiter 为例,我这里遵循一个很朴素的约定:await_suspend 里注册,complete 里注销。
void ReadSomeAwaiter::await_suspend(std::coroutine_handle<> handle) noexcept
{
handle_ = handle;
auto* sqe = context_.sqe();
prepare(sqe);
::io_uring_sqe_set_data(sqe, this);
context().track(this); // 提交 SQE 后显式声明生命周期开始
}
void ReadSomeAwaiter::complete(int result, std::uint32_t flags) noexcept
{
context().untrack(this); // CQE 返回后第一优先注销生命周期
set_result(result, flags);
if (handle_) {
handle_.resume();
}
}
其他底层 awaiter 也都按这个模式来,这样挂起操作和链表节点始终是一对一关系。
写路径里有个值得单独提一下的特例:WriteAllAwaiter。
它和 WriteSomeAwaiter 不一样,不是一次 SQE 就结束,而是“分段发送直到 buffer 清空”的循环模型。也正因为这样,track/untrack 的时机要更小心:每一轮 complete() 先 untrack(this),再决定是 resume 还是继续 arm_write()。
void WriteAllAwaiter::complete(int result, std::uint32_t flags) noexcept
{
context_.untrack(this);
set_result(result, flags);
if (is_canceling_ || error_code_ != 0 || buffer_.empty()) {
if (is_canceling_ && error_code_ == 0)
error_code_ = ECANCELED;
resume(handle_, result, flags);
}
else {
arm_write();
}
}
这里还有一个细节:取消路径里,如果当前轮没有带出错误码,会主动归一到 ECANCELED。这样上层就不会看到“已经取消但结果像成功”的歧义状态。
6. 复杂状态机处理:超时组合子与 sqe() 接口收敛
普通 awaiter 基本是一对一:一个 Operation 对一个 SQE。到了 timeout 组合逻辑,就会出现多 CQE 的竞争。
第一种是 TimeoutAwaiter。它用 IOSQE_IO_LINK 把业务操作和超时请求绑在一起。虽然内核会回两个 CQE,但它们共享同一个 Operation 节点,所以只需要追踪一次,等两个 CQE 都回来再摘除:
void await_suspend(std::coroutine_handle<> handle) noexcept
{
// 发送两个 linked SQE,但在业务视角仅追踪一个逻辑单元
context().track(this);
}
void complete(int result, std::uint32_t flags) noexcept override
{
set_result(result, flags);
if (--pending_cqes_ == 0) {
context().untrack(this); // 确保底层事件全部终结
handle_.resume();
}
}
第二种是 TimeoutCombinator。这条路是解耦设计:业务操作和计时器作为两个独立 Operation 提交。
void await_suspend(std::coroutine_handle<> handle) noexcept
{
auto* sqe = context().sqe();
::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
::io_uring_sqe_set_data(sqe, &timer_);
context().track(&timer_); // 追踪独立的计时器节点
awaiter_.await_suspend(handle); // 内部 Awaiter 自行管理追踪
}
在竞速过程中,谁先完成,就调用 context().cancel() 去中断另一方。这样 timeout 组合子和全局停机路径就复用了同一套取消逻辑,也把旧版 sqe(false) 这种隐式开关顺带收掉了。
这其实是一个我早就想动的点了:sqe(bool tracking = true)。
这个参数最初是我为了赶进度加的。当时想法很直接:业务请求默认 sqe(true),取消请求写 sqe(false),先把功能跑通。
但写完我就觉得不舒服。调用点里一个裸 bool,语义全靠人脑补,false 到底是“不计数”还是“这是取消请求”,每次都得回到定义处确认。这是一种典型的bad smell了。
旧接口是这样的:
auto sqe(bool tracking = true) -> ::io_uring_sqe*;
// 取消路径
auto* sqe = context().sqe(false);
::io_uring_prep_cancel(sqe, &timer_, 0);
::io_uring_sqe_set_data(sqe, nullptr);
这次停机重构正好给了我一个机会,把这个开关彻底拿掉:sqe() 只负责“拿一个 SQE”;计数交给 track/untrack;取消统一走 cancel()。
auto sqe() -> ::io_uring_sqe*;
// 取消路径
context().cancel(&timer_);
拆完之后,接口职责就清楚了:
sqe()只做资源分配。track/untrack只做生命周期计数。cancel()统一承载取消语义。
所以这次看起来是在修停机,其实也顺手把一处历史包袱清掉了。sqe(bool) 这个临时方案当时确实帮我快速推进了实现,但到了这个阶段,是时候干掉他了。
7. 重构事件循环:排干后退出
事件循环的退出条件现在就一条:活跃计数归零。
void IOContext::run()
{
while (tracking_operations_ > 0) {
if (should_stop_.load(std::memory_order_relaxed)) {
auto* current = head_;
while (current) {
cancel(current);
current = current->next;
}
}
scheduler_.schedule();
}
}
cancel() 负责防重复,并投递取消指令:
void IOContext::cancel(gsl::not_null<Operation*> operation) noexcept
{
if (operation->is_canceling_) return;
if (auto* sqe = scheduler_.sqe()) {
::io_uring_prep_cancel(sqe, operation, 0);
::io_uring_sqe_set_data(sqe, nullptr); // 显式丢弃取消动作本身的 CQE
operation->is_canceling_ = true;
}
}
到这里,schedule() 就不需要再返回“这一轮完成了多少”。计数和生命周期都在 Operation 自己的 track/untrack 路径里闭环。
8. 完整的停机时序
停机流程如下:
[async::stop() 触发]
|
| 置 should_stop_ = true
| eventfd 写入信号,中断 submit_and_wait 阻塞
v
[run() 进入排干模式]
|
| 遍历侵入式链表
| 调用 io_uring_prep_cancel 派发中断指令
v
[内核态收割]
|
| 中止 I/O 轮询
| 投递包含 -ECANCELED 的 CQE
v
[schedule() 消费 CQE]
|
| 触发 op->complete(-ECANCELED, 0)
| 节点自我 untrack(),递减 tracking_operations_
| 协程带着 operation_canceled 错误流转并销毁
v
[tracking_operations_ 归零]
|
| run() 循环安全终止
v
[资源清理]
通过这套机制,停机路径就变成了可验证流程:先唤醒、再取消、再排干、最后退出。在当前实现约束下,这能持续收敛悬垂指针风险,也让退出行为更可预期。
内核态的上下文切换,以及不可避免的时序问题。
️ 推荐)
深水区踩坑预警:为什么
️ 细节预警:在处理
完整资源指路: