【 基于 io_uring 的 C++20 协程网络库】14 取消机制的演进:从手动拼装到隐式注入
-
这是在学习了capy的隐式注入玩法后的更新。他的方案帮我梳理了我原本混乱的思路,感觉自己的目光都清撤了许多。
旧版本的困局:
stop_then手动包装旧版本就是让用户手工拼装,每个 awaitable 都得用
stop_then包一下:co_await stop_then(some_io_operation(), stop_token); // 得手动拼,十分繁琐这种操作确实实现了取消,但是所有的复杂性都暴露给了业务代码:
- 编写
co_await时需要考虑"这个操作是否需要取消?" - 如果需要,必须记得手动包裹
stop_then。 - 遗漏任何一个,就会产生"无法取消的操作",取消时挂起。
值得一提的是,原本的实现由于混乱的思路引入了各种多线程问题。但是在梳理之后发现都是不必要处理的操作,属于是自己把自己坑进去了。
新版本的改进:隐式注入取消
消除业务层污染
旧方案的致命缺陷不在技术复杂度,而在于污染。业务代码里到处是
stop_then,每个 I/O 函数签名都要添加std::stop_token参数,这直接增加了维护成本和开发者的认知负担。// 旧方案:取消把整个调用链都污染了 Task<> handle_client(IOContext& ctx, std::stop_token token) { auto res = co_await stop_then(async_read(fd, buffer), token); // 得手动拼 if (!res && res.error() == std::errc::operation_canceled) { // 处理被取消 } }我原本认为用户可能需要自己选择可取消和不可取消,所以通过stop_then包装器可以提供最大的自由度。但是实际上,我想不出什么操作是不需要被取消的,甚至更进一步的想,一旦出现了不可取消的挂起操作,某种意义上其实可以算作一种资源泄露bug了。
想法来源:从 capy 的实现中得到启发,我发现可以用 C++20 的
await_transform重新设计这一层。capy 为了通用性,通过 env 注入 executor、allocator、stop_token 等参数;而我们的框架强绑定 io_uring,所以能更直接地在 promise 层注入 IOContext 和 stop_token,并统一依赖 ASYNC_CANCEL 语义。正因为这些都能确定下来,所以取消机制才能被完全自动化。核心改进:三层隐式注入
第一层:Promise 环境绑定
每个
Task的 promise 自动持有当前的IOContext和stop_token,无需用户处理:struct StoppablePromise { IOContext& context_; std::stop_token stop_token_; // ... };第二层:await_transform 统一拦截
利用 C++20 的
await_transform机制,框架可以在业务代码的每一个co_await IOAwaiter处进行拦截,自动将其升级为"取消感知"版本:// 新模型:零污染,框架自动处理 Task<> handle_client() { auto res = co_await async_read(fd, buffer); // 框架自动注入取消机制 if (!res && res.error() == std::errc::operation_canceled) { // 处理取消情况 } }第三层:统一落地语义
StopTokenWrapper在 awaiter 挂起期间隐式注册取消回调。被取消操作的最终结果由原操作的 completion 决定:也是在重新设计这块时,我发现并不需要处理 ASYNC_CANCEL 的 cqe,直接丢掉,反而让整个实现简单起来了,没有复杂的生命周期问题,也没有了被取消操作和取消操作的时序问题,被 resume 时一定是被取消操作的结果返回了,而且被取消操作的结果也是精确地,result会被正确设置。
template<typename Promise> auto await_suspend(std::coroutine_handle<Promise> handle) noexcept -> std::coroutine_handle<> { auto inner_handle = inner_.await_suspend(handle, context()); // 核心:在挂起期隐式注册取消回调 if (promise_->stop_token.stop_possible()) stop_callback_.emplace(promise_->stop_token, CancelFn{ this }); return inner_handle; } // 取消触发时的统一路径 void CancelFn::operator()() noexcept { if constexpr (cancellable<Awaitable>) wrapper->inner_.cancel(wrapper->context()); // 若 Awaiter 实现 cancel,直接调用 else wrapper->context().cancel(wrapper->inner_); // 否则走 ASYNC_CANCEL }修改后
-
隐式注入:用户编写普通的
co_await,框架在后台自动升级为"取消感知"版本。 -
生命周期简化:由于 cancel CQE 直接扔掉,不再跟踪,
stop_callback的生命周期严格绑定在 awaiter 挂起这段期间。不需要shared_ptr<bool>守卫了。 -
awaiter 契约统一:
- 要是 awaiter 实现了
cancel(io_context)方法,框架就调它。 - 否则用默认的
io_uring ASYNC_CANCEL方式。
- 要是 awaiter 实现了
核心代码对比
旧版本(用户手动拼):
auto task = [](std::stop_token token) -> async::Task<> { co_await stop_then(some_io(), token); // 得手动传 token,手动调用 stop_then }; co_await async::any(task(group.stop_token()), ...);新版本(框架自动处理):
auto task = []() -> async::Task<> { co_await some_io(); // 啥都不用干,框架已经全搞定 }; // stop_token 框架内部在 Task::promise 里已经绑了,用户看不见
多线程下的行为对比
旧版本
主线程 (IOContext.run()) ↓ [处理 I/O 完成事件A] ↓ task 协程被唤醒 ↓ stop_then 析构(alive_ = false) ↓ 后台定时器线程 ↓ [发送 stop_token cancel] ↓ post 任务进入队列 ↓ [IOContext 取出 post 任务,检查 alive_] ↓ *alive_ == false,安全丢弃陷阱:如果 post 任务执行前,协程没析构,就访问了已 free 的内存。
新版本
主线程 (IOContext.run()) ↓ task 协程开始 co_await some_io() ↓ await_transform 自动把它包装成 StopTokenWrapper ↓ stop_callback 被注册在 wrapper 的栈帧上 ↓ 后台定时器线程 ↓ [stop_token 被触发] ↓ stop_callback 立刻在这个线程上执行(标准库保证线程安全) ↓ 调用 awaiter.cancel(context) 把 cancel 指令 post 回主线程 ↓ 主线程 (IOContext.run()) ↓ [从事件循环取出 cancel 指令] ↓ 发送 ASYNC_CANCEL SQE 给内核 ↓ 原操作的 CQE 返回 -ECANCELED 或原结果 ↓ awaiter 恢复协程,返回错误给上层如此一来:
stop_callback本身线程安全(标准库保证)。- cancel 动作在 IOContext 里执行,安全、可控。
- 栈帧生命周期自动保护,不需要
shared_ptr守卫。
结尾
由于这个修改算是撬了最底层的设计,所以导致绝大部分的模块都需要重写。不过在理清了混乱之后,所有的实现相较于旧版本都变得简单了很多,甚至绝大部分的Awaiter实现都变成了下面这种极简版本,也就是继承一下IOAwaiter,写个 prepare()和构造函数就完事了。
class SendAwaiter : public async::IOAwaiter<SendAwaiter, std::size_t> { private: int fd_; std::span<const std::byte> buffer_; public: SendAwaiter(int fd, std::span<const std::byte> buffer) : fd_{ fd } , buffer_{ buffer } {} void prepare(::io_uring_sqe* sqe) const noexcept { ::io_uring_prep_send(sqe, fd_, buffer_.data(), buffer_.size(), MSG_NOSIGNAL); } }; - 编写