跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 【 基于 io_uring 的 C++20 协程网络库】11 停止机制补完

【 基于 io_uring 的 C++20 协程网络库】11 停止机制补完

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 1 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 在线
    DoomjustinD 在线
    Doomjustin
    编写于 最后由 编辑
    #1

    在第 01 篇里,我留了两个 stop 相关的问题。

    第一个是:stop() 调了,但 run() 可能还卡在 submit_and_wait 里出不来。
    第二个是:就算 run() 退了,内核里可能还有已提交但未完成的操作。

    第一个问题在第 02 篇已经用 eventfd 解决了。这篇只聊第二个:怎么把停机过程做成可验证的先取消、再排干、最后退出。

    1. 强行退出的Pending状态与资源泄漏隐患

    目前 run() 的退出条件为:

    while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0)
    

    当 should_stop_ 置为 true 时,循环会立刻终止。但这时协程可能还挂在 read、accept、sleep_for 上,对应 SQE 已交给内核,CQE 还没回来。

    如果事件循环直接退出,io_uring 被析构后,用户态后续完成路径就可能拿到历史的 user_data 并把它当作 Operation* 解引用。只要这个对象已经销毁,就会变成悬垂指针风险。靠析构顺序刚好没出事不是一个可靠方案。

    2. 确定的停机序列:取消与排干(Draining)

    正确的回收顺序是:退出前先把挂起操作都收回来。

    所以 stop() 触发后,不应该立刻跳出循环,而是要先向内核发取消请求,再等所有操作(包括被取消的操作)把 CQE 回完,最后再退出。

    io_uring 提供了 io_uring_prep_cancel 接口。通过传入 Operation* 作为 user_data,内核能够定位并中断对应的操作。随后,内核会投递一个结果为 -ECANCELED 的 CQE,触发该操作的 complete() 回调,使得挂起的协程能够沿着正常的异常或错误处理路径恢复。

    这样做的好处是,不管停机发生在什么时候,协程都还能沿正常路径恢复,栈帧能按预期展开和销毁。前提是:我们得准确知道当前还有多少活跃操作。

    3. 方案取舍:全局取消的局限性与侵入式追踪

    有一个更省代码的做法:提交一个特殊 Cancel SQE,带上 IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_ALL,让内核尽可能全量取消。

    这条路看起来很短,但我最后没选,原因有三个。

    第一,状态不够确定。它是“尽力取消”,不是“给我精确活跃列表”。命令发出去后,用户态依然不知道还有多少 CQE 没回来。

    第二,语义太粗。像 timeout 组合子这种场景,一个 co_await 可能拆成多个底层请求(业务 I/O + timer)。全量取消能快速收敛,但会把上层状态机语义一起打平。

    第三,也是最关键的:我真正要解决的是“run() 什么时候能安全退出”。没有精确活跃计数,退出条件就没法验证。

    tips:

    1. 这类 ANY/ALL 取消属于能力驱动特性,使用前要确认内核与 liburing 版本是否支持对应标志位语义。
    2. 取消结果本身也会通过 CQE 回来,可能出现“目标已完成/未命中”等结果码,这些都意味着它不适合作为唯一退出判据。
    3. 在我的实现里,它可以作为“加速收敛”的辅助工具,但不能替代用户态的活跃操作追踪。

    所以我还是选了侵入式链表。链表不空,说明还有操作在等 CQE;链表清空,才允许退出。

    每个 awaiter 本来就继承自 Operation,把 prev/next 放进基类,插入删除都是 O(1),而且不需要额外分配。

    4. 零开销追踪:侵入式链表实现

    实现上我做得很直接:把侵入式指针放进 Operation 基类。

    struct Operation {
        Operation* prev{ nullptr };
        Operation* next{ nullptr };
        bool is_canceling_{ false };
    
        virtual ~Operation() = default;
        virtual void complete(int res, std::uint32_t flags) noexcept = 0;
    };
    

    IOContext 中维护链表指针及活跃操作计数:

    Operation* head_{ nullptr };
    Operation* tail_{ nullptr };
    std::size_t tracking_operations_{ 0 };
    

    track() 和 untrack() 分别负责挂入、摘除,同时更新计数:

    void IOContext::track(gsl::not_null<Operation*> operation) noexcept
    {
        if (!head_) {
            head_ = tail_ = operation;
        }
        else {
            tail_->next = operation;
            operation->prev = tail_;
            tail_ = operation;
        }
        add_work();
    }
    
    void IOContext::untrack(gsl::not_null<Operation*> operation) noexcept
    {
        // ... 链表节点摘除逻辑 ...
        operation->prev = operation->next = nullptr;
        drop_work();
    }
    

    5. 生命周期绑定:在 Awaiter 中管理注册

    以 ReadSomeAwaiter 为例,我这里遵循一个很朴素的约定:await_suspend 里注册,complete 里注销。

    void ReadSomeAwaiter::await_suspend(std::coroutine_handle<> handle) noexcept
    {
        handle_ = handle;
    
        auto* sqe = context_.sqe();
        prepare(sqe);
        ::io_uring_sqe_set_data(sqe, this);
    
        context().track(this);   // 提交 SQE 后显式声明生命周期开始
    }
    
    void ReadSomeAwaiter::complete(int result, std::uint32_t flags) noexcept
    {
        context().untrack(this);  // CQE 返回后第一优先注销生命周期
    
        set_result(result, flags);
        if (handle_) {
            handle_.resume();
        }
    }
    

    其他底层 awaiter 也都按这个模式来,这样挂起操作和链表节点始终是一对一关系。

    写路径里有个值得单独提一下的特例:WriteAllAwaiter。

    它和 WriteSomeAwaiter 不一样,不是一次 SQE 就结束,而是“分段发送直到 buffer 清空”的循环模型。也正因为这样,track/untrack 的时机要更小心:每一轮 complete() 先 untrack(this),再决定是 resume 还是继续 arm_write()。

    void WriteAllAwaiter::complete(int result, std::uint32_t flags) noexcept
    {
        context_.untrack(this);
        set_result(result, flags);
    
        if (is_canceling_ || error_code_ != 0 || buffer_.empty()) {
            if (is_canceling_ && error_code_ == 0)
                error_code_ = ECANCELED;
            resume(handle_, result, flags);
        }
        else {
            arm_write();
        }
    }
    

    这里还有一个细节:取消路径里,如果当前轮没有带出错误码,会主动归一到 ECANCELED。这样上层就不会看到“已经取消但结果像成功”的歧义状态。

    6. 复杂状态机处理:超时组合子与 sqe() 接口收敛

    普通 awaiter 基本是一对一:一个 Operation 对一个 SQE。到了 timeout 组合逻辑,就会出现多 CQE 的竞争。

    第一种是 TimeoutAwaiter。它用 IOSQE_IO_LINK 把业务操作和超时请求绑在一起。虽然内核会回两个 CQE,但它们共享同一个 Operation 节点,所以只需要追踪一次,等两个 CQE 都回来再摘除:

    void await_suspend(std::coroutine_handle<> handle) noexcept
    {
        // 发送两个 linked SQE,但在业务视角仅追踪一个逻辑单元
        context().track(this);
    }
    
    void complete(int result, std::uint32_t flags) noexcept override
    {
        set_result(result, flags);
    
        if (--pending_cqes_ == 0) {
            context().untrack(this);  // 确保底层事件全部终结
            handle_.resume();
        }
    }
    

    第二种是 TimeoutCombinator。这条路是解耦设计:业务操作和计时器作为两个独立 Operation 提交。

    void await_suspend(std::coroutine_handle<> handle) noexcept
    {
        auto* sqe = context().sqe();
        ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
        ::io_uring_sqe_set_data(sqe, &timer_);
    
        context().track(&timer_);        // 追踪独立的计时器节点
        awaiter_.await_suspend(handle);  // 内部 Awaiter 自行管理追踪
    }
    

    在竞速过程中,谁先完成,就调用 context().cancel() 去中断另一方。这样 timeout 组合子和全局停机路径就复用了同一套取消逻辑,也把旧版 sqe(false) 这种隐式开关顺带收掉了。

    这其实是一个我早就想动的点了:sqe(bool tracking = true)。

    这个参数最初是我为了赶进度加的。当时想法很直接:业务请求默认 sqe(true),取消请求写 sqe(false),先把功能跑通。

    但写完我就觉得不舒服。调用点里一个裸 bool,语义全靠人脑补,false 到底是“不计数”还是“这是取消请求”,每次都得回到定义处确认。这是一种典型的bad smell了。

    旧接口是这样的:

    auto sqe(bool tracking = true) -> ::io_uring_sqe*;
    
    // 取消路径
    auto* sqe = context().sqe(false);
    ::io_uring_prep_cancel(sqe, &timer_, 0);
    ::io_uring_sqe_set_data(sqe, nullptr);
    

    这次停机重构正好给了我一个机会,把这个开关彻底拿掉:sqe() 只负责“拿一个 SQE”;计数交给 track/untrack;取消统一走 cancel()。

    auto sqe() -> ::io_uring_sqe*;
    
    // 取消路径
    context().cancel(&timer_);
    

    拆完之后,接口职责就清楚了:

    1. sqe() 只做资源分配。
    2. track/untrack 只做生命周期计数。
    3. cancel() 统一承载取消语义。

    所以这次看起来是在修停机,其实也顺手把一处历史包袱清掉了。sqe(bool) 这个临时方案当时确实帮我快速推进了实现,但到了这个阶段,是时候干掉他了。

    7. 重构事件循环:排干后退出

    事件循环的退出条件现在就一条:活跃计数归零。

    void IOContext::run()
    {
        while (tracking_operations_ > 0) {
            if (should_stop_.load(std::memory_order_relaxed)) {
                auto* current = head_;
                while (current) {
                    cancel(current);
                    current = current->next;
                }
            }
    
            scheduler_.schedule();
        }
    }
    

    cancel() 负责防重复,并投递取消指令:

    void IOContext::cancel(gsl::not_null<Operation*> operation) noexcept
    {
        if (operation->is_canceling_) return;  
    
        if (auto* sqe = scheduler_.sqe()) {
            ::io_uring_prep_cancel(sqe, operation, 0);
            ::io_uring_sqe_set_data(sqe, nullptr);  // 显式丢弃取消动作本身的 CQE
            operation->is_canceling_ = true;
        }
    }
    

    到这里,schedule() 就不需要再返回“这一轮完成了多少”。计数和生命周期都在 Operation 自己的 track/untrack 路径里闭环。

    8. 完整的停机时序

    停机流程如下:

    [async::stop() 触发]
          |
          | 置 should_stop_ = true
          | eventfd 写入信号,中断 submit_and_wait 阻塞
          v
    [run() 进入排干模式]
          |
          | 遍历侵入式链表
          | 调用 io_uring_prep_cancel 派发中断指令
          v
    [内核态收割]
          |
          | 中止 I/O 轮询
          | 投递包含 -ECANCELED 的 CQE
          v
    [schedule() 消费 CQE]
          |
          | 触发 op->complete(-ECANCELED, 0)
          | 节点自我 untrack(),递减 tracking_operations_
          | 协程带着 operation_canceled 错误流转并销毁
          v
    [tracking_operations_ 归零]
          |
          | run() 循环安全终止
          v
    [资源清理]
    

    通过这套机制,停机路径就变成了可验证流程:先唤醒、再取消、再排干、最后退出。在当前实现约束下,这能持续收敛悬垂指针风险,也让退出行为更可预期。

    1 条回复 最后回复
    0

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组