跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 【 基于 io_uring 的 C++20 协程网络库】14 取消机制的演进:从手动拼装到隐式注入

【 基于 io_uring 的 C++20 协程网络库】14 取消机制的演进:从手动拼装到隐式注入

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 6 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 离线
    DoomjustinD 离线
    Doomjustin
    编写于 最后由 编辑
    #1

    这是在学习了capy的隐式注入玩法后的更新。他的方案帮我梳理了我原本混乱的思路,感觉自己的目光都清撤了许多。


    旧版本的困局:stop_then 手动包装

    旧版本就是让用户手工拼装,每个 awaitable 都得用 stop_then 包一下:

    co_await stop_then(some_io_operation(), stop_token);  // 得手动拼,十分繁琐
    

    这种操作确实实现了取消,但是所有的复杂性都暴露给了业务代码:

    1. 编写 co_await 时需要考虑"这个操作是否需要取消?"
    2. 如果需要,必须记得手动包裹 stop_then。
    3. 遗漏任何一个,就会产生"无法取消的操作",取消时挂起。

    值得一提的是,原本的实现由于混乱的思路引入了各种多线程问题。但是在梳理之后发现都是不必要处理的操作,属于是自己把自己坑进去了。


    新版本的改进:隐式注入取消

    消除业务层污染

    旧方案的致命缺陷不在技术复杂度,而在于污染。业务代码里到处是 stop_then,每个 I/O 函数签名都要添加 std::stop_token 参数,这直接增加了维护成本和开发者的认知负担。

    // 旧方案:取消把整个调用链都污染了
    Task<> handle_client(IOContext& ctx, std::stop_token token) {
        auto res = co_await stop_then(async_read(fd, buffer), token);  // 得手动拼
        if (!res && res.error() == std::errc::operation_canceled) {
            // 处理被取消
        }
    }
    

    我原本认为用户可能需要自己选择可取消和不可取消,所以通过stop_then包装器可以提供最大的自由度。但是实际上,我想不出什么操作是不需要被取消的,甚至更进一步的想,一旦出现了不可取消的挂起操作,某种意义上其实可以算作一种资源泄露bug了。

    想法来源:从 capy 的实现中得到启发,我发现可以用 C++20 的 await_transform 重新设计这一层。capy 为了通用性,通过 env 注入 executor、allocator、stop_token 等参数;而我们的框架强绑定 io_uring,所以能更直接地在 promise 层注入 IOContext 和 stop_token,并统一依赖 ASYNC_CANCEL 语义。正因为这些都能确定下来,所以取消机制才能被完全自动化。

    核心改进:三层隐式注入

    第一层:Promise 环境绑定

    每个 Task 的 promise 自动持有当前的 IOContext 和 stop_token,无需用户处理:

    struct StoppablePromise {
        IOContext& context_;
        std::stop_token stop_token_;
        // ...
    };
    

    第二层:await_transform 统一拦截

    利用 C++20 的 await_transform 机制,框架可以在业务代码的每一个 co_await IOAwaiter 处进行拦截,自动将其升级为"取消感知"版本:

    //  新模型:零污染,框架自动处理
    Task<> handle_client() {
        auto res = co_await async_read(fd, buffer);  // 框架自动注入取消机制
        if (!res && res.error() == std::errc::operation_canceled) {
            // 处理取消情况
        }
    }
    

    第三层:统一落地语义

    StopTokenWrapper 在 awaiter 挂起期间隐式注册取消回调。被取消操作的最终结果由原操作的 completion 决定:

    也是在重新设计这块时,我发现并不需要处理 ASYNC_CANCEL 的 cqe,直接丢掉,反而让整个实现简单起来了,没有复杂的生命周期问题,也没有了被取消操作和取消操作的时序问题,被 resume 时一定是被取消操作的结果返回了,而且被取消操作的结果也是精确地,result会被正确设置。

    template<typename Promise>
    auto await_suspend(std::coroutine_handle<Promise> handle) noexcept
        -> std::coroutine_handle<>
    {
        auto inner_handle = inner_.await_suspend(handle, context());
    
        // 核心:在挂起期隐式注册取消回调
        if (promise_->stop_token.stop_possible())
            stop_callback_.emplace(promise_->stop_token, CancelFn{ this });
    
        return inner_handle;
    }
    
    // 取消触发时的统一路径
    void CancelFn::operator()() noexcept {
        if constexpr (cancellable<Awaitable>)
            wrapper->inner_.cancel(wrapper->context());  // 若 Awaiter 实现 cancel,直接调用
        else
            wrapper->context().cancel(wrapper->inner_);   // 否则走 ASYNC_CANCEL
    }
    

    修改后

    1. 隐式注入:用户编写普通的 co_await,框架在后台自动升级为"取消感知"版本。

    2. 生命周期简化:由于 cancel CQE 直接扔掉,不再跟踪,stop_callback 的生命周期严格绑定在 awaiter 挂起这段期间。不需要 shared_ptr<bool> 守卫了。

    3. awaiter 契约统一:

      • 要是 awaiter 实现了 cancel(io_context) 方法,框架就调它。
      • 否则用默认的 io_uring ASYNC_CANCEL 方式。

    核心代码对比

    旧版本(用户手动拼):

    auto task = [](std::stop_token token) -> async::Task<> {
        co_await stop_then(some_io(), token);  // 得手动传 token,手动调用 stop_then
    };
    co_await async::any(task(group.stop_token()), ...);
    

    新版本(框架自动处理):

    auto task = []() -> async::Task<> {
        co_await some_io();  // 啥都不用干,框架已经全搞定
    };
    // stop_token 框架内部在 Task::promise 里已经绑了,用户看不见
    

    多线程下的行为对比

    旧版本

    主线程 (IOContext.run())
        ↓
        [处理 I/O 完成事件A]
        ↓
        task 协程被唤醒
        ↓
        stop_then 析构(alive_ = false)
        ↓
    
    后台定时器线程
        ↓
        [发送 stop_token cancel]
        ↓
        post 任务进入队列
        ↓
        [IOContext 取出 post 任务,检查 alive_]
        ↓
        *alive_ == false,安全丢弃
    

    陷阱:如果 post 任务执行前,协程没析构,就访问了已 free 的内存。

    新版本

    主线程 (IOContext.run())
        ↓
        task 协程开始 co_await some_io()
        ↓
        await_transform 自动把它包装成 StopTokenWrapper
        ↓
        stop_callback 被注册在 wrapper 的栈帧上
        ↓
    
    后台定时器线程
        ↓
        [stop_token 被触发]
        ↓
        stop_callback 立刻在这个线程上执行(标准库保证线程安全)
        ↓
        调用 awaiter.cancel(context) 把 cancel 指令 post 回主线程
        ↓
    
    主线程 (IOContext.run())
        ↓
        [从事件循环取出 cancel 指令]
        ↓
        发送 ASYNC_CANCEL SQE 给内核
        ↓
        原操作的 CQE 返回 -ECANCELED 或原结果
        ↓
        awaiter 恢复协程,返回错误给上层
    

    如此一来:

    • stop_callback 本身线程安全(标准库保证)。
    • cancel 动作在 IOContext 里执行,安全、可控。
    • 栈帧生命周期自动保护,不需要 shared_ptr 守卫。

    结尾

    由于这个修改算是撬了最底层的设计,所以导致绝大部分的模块都需要重写。不过在理清了混乱之后,所有的实现相较于旧版本都变得简单了很多,甚至绝大部分的Awaiter实现都变成了下面这种极简版本,也就是继承一下IOAwaiter,写个 prepare()和构造函数就完事了。

    class SendAwaiter : public async::IOAwaiter<SendAwaiter, std::size_t> {
    private:
        int fd_;
        std::span<const std::byte> buffer_;
    
    public:
        SendAwaiter(int fd, std::span<const std::byte> buffer)
          : fd_{ fd }
          , buffer_{ buffer }
        {}
    
        void prepare(::io_uring_sqe* sqe) const noexcept
        {
            ::io_uring_prep_send(sqe, fd_, buffer_.data(), buffer_.size(), MSG_NOSIGNAL);
        }
    };
    
    1 条回复 最后回复
    0

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组