跳转至内容
  • A place to talk about whatever you want

    22 主题
    168 帖子
    没有新主题
  • 7 主题
    20 帖子
    SPeakS

    @dustchens 链表结构损坏, 不闭环了 (如果问题解决可以把帖子状态设置为已解决

  • 开源软件 | 开源社区 | 开源理念 | 开源与商业 | 开源可持续发展 等相关话的交流讨论
    注: 这里的"开源"是泛化的共建共享概念, 范围包含 OSI的范围、自由软件、CC等相关内容

    57 主题
    246 帖子
    MoYingJiM

    补一个 0BSD,这个许可证很多时候也被放在与 Unlicense 和 WTFPL 相提并论的(都是公共领域)

  • 59 主题
    252 帖子
    dustchensD

    @dustchens 在 从小白的视角探究 vector 中说:

    光讲套壳,这和 vector 又有什么关系呢?深水区来了。

    第一节的精髓在于这一段之后的内容。不过我没有给出栈和作用域等内容的讲解。这段内容本身是非常复杂的,但是副作用却很小,小到每个人编程一开始都能无障碍使用。

    拿函数来类比vector,就是因为我感觉缺少一个能够切入申请空间和管理生命周期的点,如果说每个人一开始就能写的main函数,是最简化了空间管理和生命周期管理,让你意识不到它的存在,那么vector就是需要我们把这套内容拿到明面上来,且二者的本质是一样的(从空间和生命周期管理的语义来看)

  • 35 主题
    73 帖子
    DoomjustinD

    在第 01 篇里,我留了两个 stop 相关的问题。

    第一个是:stop() 调了,但 run() 可能还卡在 submit_and_wait 里出不来。
    第二个是:就算 run() 退了,内核里可能还有已提交但未完成的操作。

    第一个问题在第 02 篇已经用 eventfd 解决了。这篇只聊第二个:怎么把停机过程做成可验证的先取消、再排干、最后退出

    1. 强行退出的Pending状态与资源泄漏隐患

    目前 run() 的退出条件为:

    while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0)

    当 should_stop_ 置为 true 时,循环会立刻终止。但这时协程可能还挂在 read、accept、sleep_for 上,对应 SQE 已交给内核,CQE 还没回来。

    如果事件循环直接退出,io_uring 被析构后,用户态后续完成路径就可能拿到历史的 user_data 并把它当作 Operation* 解引用。只要这个对象已经销毁,就会变成悬垂指针风险。靠析构顺序刚好没出事不是一个可靠方案。

    2. 确定的停机序列:取消与排干(Draining)

    正确的回收顺序是:退出前先把挂起操作都收回来。

    所以 stop() 触发后,不应该立刻跳出循环,而是要先向内核发取消请求,再等所有操作(包括被取消的操作)把 CQE 回完,最后再退出。

    io_uring 提供了 io_uring_prep_cancel 接口。通过传入 Operation* 作为 user_data,内核能够定位并中断对应的操作。随后,内核会投递一个结果为 -ECANCELED 的 CQE,触发该操作的 complete() 回调,使得挂起的协程能够沿着正常的异常或错误处理路径恢复。

    这样做的好处是,不管停机发生在什么时候,协程都还能沿正常路径恢复,栈帧能按预期展开和销毁。前提是:我们得准确知道当前还有多少活跃操作。

    3. 方案取舍:全局取消的局限性与侵入式追踪

    有一个更省代码的做法:提交一个特殊 Cancel SQE,带上 IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_ALL,让内核尽可能全量取消。

    这条路看起来很短,但我最后没选,原因有三个。

    第一,状态不够确定。它是“尽力取消”,不是“给我精确活跃列表”。命令发出去后,用户态依然不知道还有多少 CQE 没回来。

    第二,语义太粗。像 timeout 组合子这种场景,一个 co_await 可能拆成多个底层请求(业务 I/O + timer)。全量取消能快速收敛,但会把上层状态机语义一起打平。

    第三,也是最关键的:我真正要解决的是“run() 什么时候能安全退出”。没有精确活跃计数,退出条件就没法验证。

    tips:

    这类 ANY/ALL 取消属于能力驱动特性,使用前要确认内核与 liburing 版本是否支持对应标志位语义。 取消结果本身也会通过 CQE 回来,可能出现“目标已完成/未命中”等结果码,这些都意味着它不适合作为唯一退出判据。 在我的实现里,它可以作为“加速收敛”的辅助工具,但不能替代用户态的活跃操作追踪。

    所以我还是选了侵入式链表。链表不空,说明还有操作在等 CQE;链表清空,才允许退出。

    每个 awaiter 本来就继承自 Operation,把 prev/next 放进基类,插入删除都是 O(1),而且不需要额外分配。

    4. 零开销追踪:侵入式链表实现

    实现上我做得很直接:把侵入式指针放进 Operation 基类。

    struct Operation { Operation* prev{ nullptr }; Operation* next{ nullptr }; bool is_canceling_{ false }; virtual ~Operation() = default; virtual void complete(int res, std::uint32_t flags) noexcept = 0; };

    IOContext 中维护链表指针及活跃操作计数:

    Operation* head_{ nullptr }; Operation* tail_{ nullptr }; std::size_t tracking_operations_{ 0 };

    track() 和 untrack() 分别负责挂入、摘除,同时更新计数:

    void IOContext::track(gsl::not_null<Operation*> operation) noexcept { if (!head_) { head_ = tail_ = operation; } else { tail_->next = operation; operation->prev = tail_; tail_ = operation; } add_work(); } void IOContext::untrack(gsl::not_null<Operation*> operation) noexcept { // ... 链表节点摘除逻辑 ... operation->prev = operation->next = nullptr; drop_work(); } 5. 生命周期绑定:在 Awaiter 中管理注册

    以 ReadSomeAwaiter 为例,我这里遵循一个很朴素的约定:await_suspend 里注册,complete 里注销。

    void ReadSomeAwaiter::await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); context().track(this); // 提交 SQE 后显式声明生命周期开始 } void ReadSomeAwaiter::complete(int result, std::uint32_t flags) noexcept { context().untrack(this); // CQE 返回后第一优先注销生命周期 set_result(result, flags); if (handle_) { handle_.resume(); } }

    其他底层 awaiter 也都按这个模式来,这样挂起操作和链表节点始终是一对一关系。

    写路径里有个值得单独提一下的特例:WriteAllAwaiter。

    它和 WriteSomeAwaiter 不一样,不是一次 SQE 就结束,而是“分段发送直到 buffer 清空”的循环模型。也正因为这样,track/untrack 的时机要更小心:每一轮 complete() 先 untrack(this),再决定是 resume 还是继续 arm_write()。

    void WriteAllAwaiter::complete(int result, std::uint32_t flags) noexcept { context_.untrack(this); set_result(result, flags); if (is_canceling_ || error_code_ != 0 || buffer_.empty()) { if (is_canceling_ && error_code_ == 0) error_code_ = ECANCELED; resume(handle_, result, flags); } else { arm_write(); } }

    这里还有一个细节:取消路径里,如果当前轮没有带出错误码,会主动归一到 ECANCELED。这样上层就不会看到“已经取消但结果像成功”的歧义状态。

    6. 复杂状态机处理:超时组合子与 sqe() 接口收敛

    普通 awaiter 基本是一对一:一个 Operation 对一个 SQE。到了 timeout 组合逻辑,就会出现多 CQE 的竞争。

    第一种是 TimeoutAwaiter。它用 IOSQE_IO_LINK 把业务操作和超时请求绑在一起。虽然内核会回两个 CQE,但它们共享同一个 Operation 节点,所以只需要追踪一次,等两个 CQE 都回来再摘除:

    void await_suspend(std::coroutine_handle<> handle) noexcept { // 发送两个 linked SQE,但在业务视角仅追踪一个逻辑单元 context().track(this); } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (--pending_cqes_ == 0) { context().untrack(this); // 确保底层事件全部终结 handle_.resume(); } }

    第二种是 TimeoutCombinator。这条路是解耦设计:业务操作和计时器作为两个独立 Operation 提交。

    void await_suspend(std::coroutine_handle<> handle) noexcept { auto* sqe = context().sqe(); ::io_uring_prep_timeout(sqe, &timeout_, 0, 0); ::io_uring_sqe_set_data(sqe, &timer_); context().track(&timer_); // 追踪独立的计时器节点 awaiter_.await_suspend(handle); // 内部 Awaiter 自行管理追踪 }

    在竞速过程中,谁先完成,就调用 context().cancel() 去中断另一方。这样 timeout 组合子和全局停机路径就复用了同一套取消逻辑,也把旧版 sqe(false) 这种隐式开关顺带收掉了。

    这其实是一个我早就想动的点了:sqe(bool tracking = true)。

    这个参数最初是我为了赶进度加的。当时想法很直接:业务请求默认 sqe(true),取消请求写 sqe(false),先把功能跑通。

    但写完我就觉得不舒服。调用点里一个裸 bool,语义全靠人脑补,false 到底是“不计数”还是“这是取消请求”,每次都得回到定义处确认。这是一种典型的bad smell了。

    旧接口是这样的:

    auto sqe(bool tracking = true) -> ::io_uring_sqe*; // 取消路径 auto* sqe = context().sqe(false); ::io_uring_prep_cancel(sqe, &timer_, 0); ::io_uring_sqe_set_data(sqe, nullptr);

    这次停机重构正好给了我一个机会,把这个开关彻底拿掉:sqe() 只负责“拿一个 SQE”;计数交给 track/untrack;取消统一走 cancel()。

    auto sqe() -> ::io_uring_sqe*; // 取消路径 context().cancel(&timer_);

    拆完之后,接口职责就清楚了:

    sqe() 只做资源分配。 track/untrack 只做生命周期计数。 cancel() 统一承载取消语义。

    所以这次看起来是在修停机,其实也顺手把一处历史包袱清掉了。sqe(bool) 这个临时方案当时确实帮我快速推进了实现,但到了这个阶段,是时候干掉他了。

    7. 重构事件循环:排干后退出

    事件循环的退出条件现在就一条:活跃计数归零。

    void IOContext::run() { while (tracking_operations_ > 0) { if (should_stop_.load(std::memory_order_relaxed)) { auto* current = head_; while (current) { cancel(current); current = current->next; } } scheduler_.schedule(); } }

    cancel() 负责防重复,并投递取消指令:

    void IOContext::cancel(gsl::not_null<Operation*> operation) noexcept { if (operation->is_canceling_) return; if (auto* sqe = scheduler_.sqe()) { ::io_uring_prep_cancel(sqe, operation, 0); ::io_uring_sqe_set_data(sqe, nullptr); // 显式丢弃取消动作本身的 CQE operation->is_canceling_ = true; } }

    到这里,schedule() 就不需要再返回“这一轮完成了多少”。计数和生命周期都在 Operation 自己的 track/untrack 路径里闭环。

    8. 完整的停机时序

    停机流程如下:

    [async::stop() 触发] | | 置 should_stop_ = true | eventfd 写入信号,中断 submit_and_wait 阻塞 v [run() 进入排干模式] | | 遍历侵入式链表 | 调用 io_uring_prep_cancel 派发中断指令 v [内核态收割] | | 中止 I/O 轮询 | 投递包含 -ECANCELED 的 CQE v [schedule() 消费 CQE] | | 触发 op->complete(-ECANCELED, 0) | 节点自我 untrack(),递减 tracking_operations_ | 协程带着 operation_canceled 错误流转并销毁 v [tracking_operations_ 归零] | | run() 循环安全终止 v [资源清理]

    通过这套机制,停机路径就变成了可验证流程:先唤醒、再取消、再排干、最后退出。在当前实现约束下,这能持续收敛悬垂指针风险,也让退出行为更可预期。

  • 一个技术知识分享、学习、交流的社区

    15 主题
    50 帖子
    sunrisepeakS

    @Doomjustin 版块已创建, 可以检查确认一下是否有话题贴/Topic工具的权限

    https://forum.d2learn.org/category/26/xin
  • Got a question? Ask away!

    4 主题
    14 帖子
    SPeakS

    备注一下使数学公式的使用语法

    单行公式语法 - $ 你的公式 $

    $ log_2^n $

    $ log_2^n $

    多行公式语法 - $$ 你的公式 $$

    $$ log_2^n => log_2^9 = 3 , n = 9 $$

    $$
    log_2^n =>
    log_2^9 = 3, n = 9
    $$

公告栏 | Bulletin Board

欢迎加入d2learn社区 - 社区指南
Welcome to the d2learn Community - Community Guide

一个以 [知识、技术、代码、项目、想法、开源] 相关话题为主导的社区
A community focused on topics related to [knowledge, technology, code, projects, ideas, and open source].


在线用户