跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 【 基于 io_uring 的 C++20 协程网络库】03 基于链式请求的零开销超时机制

【 基于 io_uring 的 C++20 协程网络库】03 基于链式请求的零开销超时机制

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 3 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 离线
    DoomjustinD 离线
    Doomjustin
    编写于 最后由 编辑
    #1

    这一步要实现什么,我也很纠结。最好的当然是直奔socket的封装,然后实现async accept,async read,async write这些典型的协程调用。

    但是上文中留下一些坑又需要尽快填一下,否则后续再改会导致更大的重构。我们需要在这一篇中解释一下为什么要在 PollAwaiter 中开放set_result,prepare,context这些看起来毫无卵用的接口。

    最主要的目的就是能实现一个timeout接口。

    在传统的 Reactor 模型(如 epoll)中,实现超时通常需要在用户态维护一个独立的数据结构(如最小堆或时间轮)来管理定时器,这往往伴随着额外的动态内存分配(分配定时器节点)以及后台线程的唤醒开销。

    io_uring 提供了更底层的解法:链式请求(Linked Requests)。

    通过将 I/O 操作与定时器在内核态进行绑定,我们可以将状态同步的复杂度完全下沉至内核,从而实现真正的零动态分配超时机制。

    1. 接口约束:single_shot_only_operation

    在实现通用的超时包装器之前,我们需要界定“什么类型的操作允许被包装”。

    我们定义 single_shot_only_operation,要求目标类型不仅继承自 Operation 基类,还必须提供用于获取上下文、组装 SQE 以及处理结果的特定接口:

    #include <concepts>
    #include <coroutine>
    #include <expected>
    #include <system_error>
    
    template<typename T>
    concept single_shot_only_operation = requires (T& op, ::io_uring_sqe* sqe)
    {
        typename T::resume_type;
    
        requires std::is_lvalue_reference_v<decltype(op.context())>;
        op.context();
        op.prepare(sqe);
        op.set_result(0, 0);
        { op.await_resume() } -> std::same_as<std::expected<typename T::resume_type, std::error_code>>;
    } && std::derived_from<T, Operation>;
    

    基于此约束,无论底层的协程等待体是 Socket::async_read 还是 SignalSet::async_wait,只要满足条件,编译器即可保证其能被安全地赋予超时语义。

    2. 核心机制:IOSQE_IO_LINK 与内核竞速

    io_uring 的 SQE 链式调用是我们实现零开销超时的核心。当我们在一个 SQE 的标志位中设置 IOSQE_IO_LINK 时,内核会将其与紧随其后提交的下一个 SQE 绑定为一个原子链。

    配合专用的操作码 IORING_OP_LINK_TIMEOUT,内核会执行以下竞速逻辑:

    1. 内核并行处理业务 I/O 请求,并同时启动定时器。
    2. 如果业务请求先完成,内核自动取消挂载的定时器。
    3. 如果定时器先到期,内核自动强行取消业务请求,并使其返回 -ECANCELED。

    这种将同步状态机交由内核仲裁的设计,使得用户态代码无需介入复杂的取消流程。

    3. TimeoutAwaiter 的内存安全陷阱与实现

    构建 TimeoutAwaiter 时,面临的最大工程挑战是协程帧的生命周期管理。

    由于我们向内核一次性提交了两个 SQE(业务 I/O 与 Timeout),内核在执行完毕后,必然会返回两个对应的 CQE。如果采用“先到先得”的简单逻辑,在第一个 CQE 到达时立即调用 handle.resume() 恢复协程,会导致一个隐蔽且致命的 Use-After-Free (UAF) 漏洞:

    当协程被恢复后,包含在该协程帧内的 TimeoutAwaiter 对象可能会随着当前作用域的结束而立即析构。此时,内核中仍有一个被取消的 CQE 正在返回途中。当 IOContext 收割这个滞后的 CQE 并尝试调用 complete 时,其 user_data 指针已指向被释放的内存,导致进程崩溃。

    因此,必须在 complete 中引入一个无锁的计数器屏障,确保两个 CQE 全部落地后,再将控制权交还给协程。

    完整的 TimeoutAwaiter 实现如下:

    template<single_shot_only_operation InnerOperation>
    class TimeoutAwaiter : public Operation {
    public:
        using resume_type = typename InnerOperation::resume_type;
    
        template<typename Duration>
        TimeoutAwaiter(InnerOperation&& operation, Duration timeout) noexcept
          : inner_operation_{ std::forward<InnerOperation>(operation) }
        {
            using namespace std::chrono;
    
            timeout_.tv_sec = duration_cast<seconds>(timeout).count();
            timeout_.tv_nsec = duration_cast<nanoseconds>(timeout % 1s).count();
        }
    
        constexpr auto await_ready() const noexcept -> bool { return false; }
    
        void await_suspend(std::coroutine_handle<> handle) noexcept
        {
            handle_ = handle;
    
            auto* io_sqe = context().sqe();
            auto* timeout_sqe = context().sqe();
    
            // 组装业务 I/O 并设置链式标志
            inner_operation_.prepare(io_sqe);
            io_sqe->flags |= IOSQE_IO_LINK;
            ::io_uring_sqe_set_data(io_sqe, this);
    
            // 紧跟超时探测请求
            ::io_uring_prep_link_timeout(timeout_sqe, &timeout_, 0);
            ::io_uring_sqe_set_data(timeout_sqe, this);
        }
    
        auto await_resume() noexcept -> std::expected<resume_type, std::error_code>
        {
            if (is_timed_out_)
                return unexpected_system_error(std::errc::timed_out);
    
            inner_operation_.set_result(result_, 0);
            return inner_operation_.await_resume();
        }
    
        void set_result(int result, std::uint32_t flags) noexcept
        {
            if (result == -ETIME)
                is_timed_out_ = true;
            else if (result != -ECANCELED)
                result_ = result;
        }
    
        void complete(int result, std::uint32_t flags) noexcept override
        {
            set_result(result, flags);
    
            if (--pending_cqes_ == 0) {
                auto handle = std::exchange(handle_, {});
                handle.resume();
            }
        }
        
        auto context() noexcept -> decltype(std::declval<InnerOperation&>().context())
        {
            return inner_operation_.context();
        }
    
    private:
        InnerOperation inner_operation_;
        struct __kernel_timespec timeout_{};
    
        std::coroutine_handle<> handle_{ nullptr };
        
        int pending_cqes_{ 2 }; // 提交了 2 个 SQE,必然返回 2 个 CQE
        bool is_timed_out_{ false };
        int result_{ -ECANCELED };
    };
    
    template<single_shot_only_operation Operation, typename Duration>
    auto timeout(Operation&& awaitable, Duration t) noexcept -> TimeoutAwaiter<std::decay_t<Operation>>
    {
        return TimeoutAwaiter<std::decay_t<Operation>>{ std::forward<Operation>(awaitable), t };
    }
    

    架构收益:Core-Per-Thread 带来的无锁抽象

    细心的读者可能会发现,在处理跨越不同异步回调的生命周期同步时,我们仅仅使用了一个普通的内建整型变量 int pending_cqes_{ 2 };,而没有求助于 std::atomic<int> 或任何形式的互斥锁。

    这正是我们选择 Core-Per-Thread(单线程独立上下文) 架构的直接收益。在该模型下,底层 io_uring 队列的投递、事件的收割(IOContext::run)、complete 回调的触发,以及协程的恢复,全都被严格限制在单一线程的顺序执行流中。这种确定的串行化特征从根本上消除了数据竞争。因此,我们可以毫无顾忌地使用裸整型进行状态流转,彻底免除了原子操作带来的缓存行同步与内存屏障开销,将“零开销抽象”贯彻到了每一个微小的细节中。

    4. 正交设计:避免 API 表面积爆炸

    在上述实现中,std::expected 发挥了重要作用,我们将内核传回的 -ETIME 翻译为了标准的 std::errc::timed_out。但比类型安全更值得关注的,是这种基于泛型与组合语义带来的高层架构美学。

    在传统的网络库设计中,超时逻辑往往被直接硬编码进具体的 I/O 操作中。这意味着设计者不得不提供诸如 async_read_with_timeout、async_write_with_timeout、async_connect_with_timeout 等一系列冗余接口。假设系统存在 $N$ 种基础操作,未来又需要引入 $M$ 种类似于超时的修饰语义,API 的数量就会呈现 $N \times M$ 的指数级膨胀,最终导致表面积爆炸(API Surface Area Explosion)。

    而我们设计的 TimeoutAwaiter 与任何具体的业务操作是严格正交的。通过 C++20 的 Concept 约束,它充当了一个纯粹的通用修饰器,能够无缝叠加在任何满足规范的操作之上,将库的 API 复杂度完美控制在了 $N + M$。

    结合泛型的工厂函数,上层业务代码可以以极低侵入性的自然语序组合它们:

    auto network_read_task(IOContext& context, Socket& socket) -> Task<void>
    {
        using namespace std::chrono_literals;
        
        // 组合语义:以正交的方式为 async_read 附加 5 秒的超时约束
        auto result = co_await timeout(socket.async_read(buffer), 5s);
        
        if (!result) {
            if (result.error() == std::errc::timed_out)
                spdlog::warn("Read operation timed out.");
            else
                spdlog::error("Read failed: {}", result.error().message());
                
            co_return;
        }
        
        spdlog::info("Successfully read {} bytes.", result.value());
    }
    

    演示

    由于现有实现比较简陋,我们只能复用下02章中async_wait来测试下timeout的语义是否正确。实际的代码里,是不太可能组合async_wait和timeout的。

    
    auto shutdown_monitor(IOContext& context) -> Task<void>
    {
        using namespace std::chrono_literals;
    
        SignalSet sets{ context, signals::interrupt, signals::terminate };
    
        // 唯一改动点,测试一下timeout的行为是否正确
        co_await timeout(sets.async_wait(), 5s);
    
        spdlog::info("Received shutdown signal, stopping IOContext...");
        context.stop();
    }
    
    auto demo(IOContext& context) -> Task<void>
    {
        using namespace std::chrono_literals;
        spdlog::info("demo started");    
    
        // 模拟一些持续的异步工作,直到接收到退出信号
        while (true) 
            co_await sleep_for(context, 1s);
    
        spdlog::info("demo completed");
    }
    
    int main(int argc, char* argv[])
    {
        IOContext context{};
    
        co_spawn(context, demo(context));
        co_spawn(context, shutdown_monitor(context));
    
        context.run();
    
        spdlog::info("IOContext stopped, exiting...");
    
        return EXIT_SUCCESS;
    }
    

    执行结果

    [2026-04-20 02:28:23.356] [info] demo started
    [2026-04-20 02:28:28.792] [info] Received shutdown signal, stopping IOContext...
    [2026-04-20 02:28:28.792] [info] IOContext stopped, exiting...
    

    可以看到,5s之后,async_wait结束了等待,context.stop()触发,程序结束了。

    也可以按下Ctrl+C来触发SIGINT信号

    /home/doom/blog/build/demo/blog.timeout_v1
    [2026-04-20 10:25:32.885] [info] demo started
    ^C[2026-04-20 10:25:33.891] [info] Received shutdown signal, stopping IOContext...
    [2026-04-20 10:25:33.891] [info] IOContext stopped, exiting...
    

    完整代码

    1 条回复 最后回复
    0

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组