跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 【 基于 io_uring 的 C++20 协程网络库】02:模块解耦与完备的退出机制

【 基于 io_uring 的 C++20 协程网络库】02:模块解耦与完备的退出机制

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 6 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 在线
    DoomjustinD 在线
    Doomjustin
    编写于 最后由 Doomjustin 编辑
    #1

    在上文中,我们构建了 IOContext 的核心事件循环骨架。然而,随着组件的增加,我们需要解决两个实际的工程问题:

    一是如何对底层上下文进行合理的抽象与解耦;

    二是如何优雅、无阻塞地处理外部中断信号并终止事件循环。

    1. 模块解耦与泛型化设计

    考虑到未来我们可能会迭代出多个版本的 IOContext,为了最大化代码复用,将具体的协程 Awaiter(如 SleepAwaiter)与底层的 IOContext 实现解耦是必要的。

    首先,我们将 Operation 接口提取到独立的头文件中。

    其次,对于 SleepAwaiter,由于它依赖 Context::sqe(),若在头文件中硬编码 IOContext,必须要立刻知道IOContext的定义。因此,我们采用模板化设计,将 Context 泛型化,在调用点推导出确切的类型。

    template<typename Context>
    class SleepAwaiter: public Operation {
    public:
        template<chrono_duration Duration>
        SleepAwaiter(Context& context, Duration d)
          : context_{ context }
        {
            using namespace std::chrono;
            // 转换 std::chrono 时间为内核认识的 timespec
            timeout_.tv_sec = duration_cast<seconds>(d).count();
            timeout_.tv_nsec = duration_cast<nanoseconds>(d % seconds(1)).count();
        }
    
        [[nodiscard]]
        constexpr auto await_ready() const noexcept -> bool 
        { 
            return false; 
        }
    
        void await_suspend(std::coroutine_handle<> handle) noexcept
        {
            handle_ = handle;
            auto* sqe = context_.sqe();
    
            // 提交纯超时指令,count 设为 0 表示只受时间触发
            ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
            ::io_uring_sqe_set_data(sqe, this);
        }
    
        auto await_resume() noexcept -> std::expected<void, std::error_code>
        {
            // io_uring 中,超时正常结束会返回 ETIME
            if (error_code_ == ETIME || error_code_ == 0)
                return {};
            
            // 其他错误(如 ECANCELED 被提前强杀)
            return unexpected_system_error(error_code_);
        }
    
        void complete(int res, std::uint32_t flags) noexcept override
        {
            error_code_ = -res;
            
            if (handle_) {
                auto handle = std::exchange(handle_, nullptr);
                handle.resume();
            }
        }
    
    private:
        Context& context_;
        struct __kernel_timespec timeout_{};
        std::coroutine_handle<> handle_{ nullptr };
        int error_code_{ 0 };
    };
    
    template<typename Context, typename Duration>
    auto sleep_for(Context& context, Duration duration) noexcept -> SleepAwaiter<Context>
    {
        return SleepAwaiter<Context>{ context, duration };
    }
    

    设计注记:
    通常情况下,过度泛型化(滥用模板)会劣化编译时长,并不值得推崇。但在基础设施库的设计中,静态多态(基于模板的 Duck Typing)能够做到零运行时开销(Zero-overhead),且调用方代码无需任何修改即可适配不同版本的 IOContext,这种妥协是极具工程价值的。

    2. 完善事件循环的终止机制 (eventfd)

    上个版本中,我们使用 std::atomic<bool> should_stop_ 标志来控制循环退出。但这存在一个死锁隐患:如果 IOContext::run() 正阻塞在 io_uring_submit_and_wait 系统调用上,单纯修改布尔变量是无法唤醒内核态线程的。

    我们需要一种跨越内核与用户态的唤醒机制。在传统的 Reactor 模式中,通常采用管道(pipe)或 eventfd,在 io_uring 体系下,eventfd 依然是开销极小且最适用的方案。

    核心状态重构:区分系统事件与业务逻辑

    在引入 eventfd 后,完成队列(CQ)中不仅会包含业务逻辑的事件(如网络 I/O、定时器),还会混入我们内部触发的 wakeup 事件。
    这就要求我们必须在状态追踪上做出严格区分:

    1. count:追踪当前批次取出的所有 CQE 数量,用于向前推进内核的共享环形缓冲区(io_uring_cq_advance)。

    2. workdone:追踪实际完成的业务任务数量,仅针对这些任务去扣减 outstanding_works_。如果不对二者加以区分,wakeup 信号会导致业务计数器异常递减,引发程序提前退出或触发断言失败。

    同时,我们利用 liburing 原生的内联辅助函数 io_uring_cqe_get_data64、io_uring_cqe_get_data 以及 io_uring_sqe_set_data64 来取代底层的直接字段访问,这消除了 reinterpret_cast 的滥用,确保了类型安全的边界。

    完整的 IOContext 实现如下:

    #include <sys/eventfd.h>
    #include <unistd.h>
    #include <limits>
    #include <cassert>
    
    class IOContext {
    public:
        explicit IOContext(unsigned entries = 1024)
        {
            if (auto res = ::io_uring_queue_init(entries, &ring_, 0); res < 0)
                throw_system_error(-res, "io_uring_queue_init");            
    
            wakeup_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
            if (wakeup_fd_ == -1)
                throw_system_error("Failed to create eventfd for stopping IOContext");
    
            arm_wakeup();
        }
    
        IOContext(const IOContext&) = delete;
        auto operator=(const IOContext&) -> IOContext& = delete;
        
        // 为了简化实现,我们不支持移动
        IOContext(IOContext&& other) noexcept = delete;
        auto operator=(IOContext&&) -> IOContext& = delete;
    
        ~IOContext()
        {
            ::io_uring_queue_exit(&ring_);
            ::close(wakeup_fd_);
        }
    
        void run()
        {
            ::io_uring_cqe* cqe{ nullptr };
    
            while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0)
            {
                auto res = ::io_uring_submit_and_wait(&ring_, 1);
                if (res < 0) {
                    if (res == -EINTR)
                        continue;
    
                    throw_system_error("io_uring_submit_and_wait");
                }
                    
                unsigned head;
                unsigned count{ 0 };
                unsigned workdone{ 0 };
    
                io_uring_for_each_cqe(&ring_, head, cqe) {
                    ++count;
    
                    // 探测到内部唤醒信号
                    if (io_uring_cqe_get_data64(cqe) == WAKEUP_MARKER) {
                        resume_wakeup();
                        arm_wakeup();
                        continue;
                    }
    
                    // 正常的业务逻辑完成事件
                    if (io_uring_cqe_get_data64(cqe) != 0) {
                        auto* op = static_cast<Operation*>(io_uring_cqe_get_data(cqe));
                        op->complete(cqe->res, cqe->flags);
    
                        ++workdone;
                    }
                }
    
                // 推进环形缓冲区必须使用总事件数 count
                if (count > 0)
                    ::io_uring_cq_advance(&ring_, count);
    
                // 扣减未决任务必须使用实际完成的业务数 workdone
                if (workdone > 0)
                    outstanding_works_ -= workdone;
            }
        }
    
        [[nodiscard]]
        auto sqe() -> ::io_uring_sqe*
        {
            auto* sqe = ::io_uring_get_sqe(&ring_);
            if (!sqe)
                throw_system_error("io_uring_get_sqe");
    
            add_work();
            return sqe;
        }
    
        void stop()
        {
            should_stop_.store(true, std::memory_order_relaxed);
            wakeup();
        }
    
        auto ring() noexcept -> ::io_uring* { return &ring_; }
        auto ring() const noexcept -> const ::io_uring* { return &ring_; }
    
        void add_work() noexcept { ++outstanding_works_; }
    
        void drop_work() noexcept
        {
            assert(outstanding_works_ > 0);
            --outstanding_works_;
        }
        
    private:
        static constexpr auto WAKEUP_MARKER = std::numeric_limits<std::uintptr_t>::max();
    
        ::io_uring ring_{};
        int wakeup_fd_{ -1 };
    
        // 只用来追踪 io_context 之外的操作,并不需要用户主动来使用相关的接口
        std::size_t outstanding_works_{ 0 };
        // stop 会被跨线程调用,所以需要使用原子变量来保证线程安全
        std::atomic<bool> should_stop_{ false };
    
        void arm_wakeup() noexcept
        {
            auto* sqe = ::io_uring_get_sqe(&ring_);
            if (!sqe)
                throw_system_error("io_uring_get_sqe failed when re-arming wakeup");
    
            ::io_uring_prep_poll_add(sqe, wakeup_fd_, POLLIN);
            // 采用 64 位专有 setter,避免指针转换警告
            ::io_uring_sqe_set_data64(sqe, WAKEUP_MARKER);
        }   
        
        void wakeup()
        {
            std::uint64_t val = 1;
            ::write(wakeup_fd_, &val, sizeof(val));
        }
    
        void resume_wakeup()
        {
            uint64_t val;
            ::read(wakeup_fd_, &val, sizeof(val));
        }
    };
    

    关于未处理完成的 CQE 的处置:
    触发 stop() 退出循环后,队列中如果还有积压的 CQE 怎么办?

    这正是 RAII 管理机制的优势所在。IOContext 的生命周期与系统资源严格绑定,当程序退出,IOContext 析构时,io_uring_queue_exit 会协同内核彻底销毁共享的环形缓冲区。由于事件循环已经终止,不会再有新的逻辑被触发,因此忽略未决的 CQE 是安全且合理的策略。

    如果用户确实有在停止后清理特定状态的需求,可以通过暴露的 ring() 接口自行干预。

    3. 基于 signalfd 的统一中断处理

    既然已经实现了安全的唤醒与停止语义,顺理成章地,我们应将操作系统的信号(如 SIGINT, SIGTERM)也纳入异步框架。在 Linux 平台上,signalfd 提供了一种将异步中断转化为文件描述符可读事件的机制,它能被完美地集成进 io_uring 轮询模型中。

    3.1 强类型 Signal 封装

    避免裸露的魔术整数:

    class Signal {
    public:
        explicit constexpr Signal(int signal) noexcept
          : signal_{ signal }
        {}
    
        auto operator==(const Signal&) const noexcept -> bool = default;
        // 这里的隐式转换是否提供看个人,我觉得不提供更好
        constexpr operator int() const noexcept { return signal_; }
        [[nodiscard]] constexpr auto value() const noexcept { return signal_; }
    
    private:
        int signal_;
    };
    
    struct signals {
        signals() = delete;
        
        static constexpr auto interrupt = Signal{ SIGINT };
        static constexpr auto terminate = Signal{ SIGTERM };
        static constexpr auto quit = Signal{ SIGQUIT };
        static constexpr auto hangup = Signal{ SIGHUP };
    };
    

    3.2 信号集 SignalSet 管理

    使用折叠表达式优雅地处理变参掩码。另外,为保障跨线程时的健壮性,此处采用了标准的 pthread_sigmask。

    #include <sys/signalfd.h>
    #include <signal.h>
    
    template<typename Context>
    class SignalSet {
    public:
        template<typename... Signals>
            requires (std::same_as<Signals, Signal> && ...)
        SignalSet(Context& io_context, Signals... signals)
          : io_context_{ io_context }
        {
            ::sigemptyset(&mask_);
            (::sigaddset(&mask_, signals.value()), ...);
    
            // 屏蔽这些信号的默认异步行为,交由 signalfd 同步读取
            if (::pthread_sigmask(SIG_BLOCK, &mask_, nullptr) == -1)
                throw_system_error("Failed to block signals");
    
            fd_ = ::signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC);
            if (fd_ == -1)
                throw_system_error("Failed to create signalfd");
        }
    
        SignalSet(const SignalSet&) = delete;
        auto operator=(const SignalSet&) -> SignalSet& = delete;
    
        SignalSet(SignalSet&& other) noexcept
          : io_context_{ other.io_context_ }, 
            fd_{ std::exchange(other.fd_, -1) },
            mask_{ other.mask_ }
        {}
    
        auto operator=(SignalSet&& other) noexcept -> SignalSet& = delete;
    
        ~SignalSet()
        {
            if (fd_ != -1)
                ::close(fd_);
        }
    
    private:
        Context& io_context_;
        int fd_{ -1 };
        sigset_t mask_;
    };
    

    3.3 构建 PollAwaiter 与 async_wait

    既然是协程库,那我们理所应当的应该将监听行为设计成协程。为监听可读事件提供通用的 PollAwaiter:

    template<typename Context>
    class PollAwaiter : public Operation {
    public:
        using resume_type = void;
    
        PollAwaiter(Context& context, int fd, short events) noexcept
          : context_{ context }, 
            fd_{ fd }, 
            events_{ events }
        {}
    
        [[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; }
    
        auto await_suspend(std::coroutine_handle<> handle) noexcept -> void
        {
            handle_ = handle;
            auto* sqe = context_.sqe();
            
            ::io_uring_prep_poll_add(sqe, fd_, events_);
            ::io_uring_sqe_set_data(sqe, this);
        }
    
        auto await_resume() const noexcept -> std::expected<void, std::error_code>
        {
            if (error_code_ != 0)
                return unexpected_system_error(error_code_);
                
            return {};
        }
    
        void complete(int res, [[maybe_unused]] std::uint32_t flags) noexcept override
        {
            error_code_ = res < 0 ? -res : 0;
            
            if (handle_) {
                auto handle = std::exchange(handle_, nullptr);
                handle.resume();
            }
        }
    
    private:
        Context& context_;
        int fd_;
        short events_;
        std::coroutine_handle<> handle_{ nullptr };
        int error_code_{ 0 };
    };
    
    // 在 SignalSet 外部实现
    template<typename Context>
    auto SignalSet<Context>::async_wait() noexcept -> PollAwaiter<Context>
    {
        return PollAwaiter<Context>{ io_context_, fd_, POLLIN };
    }   
    

    4. 系统集成演示

    至此,基础组件均已就位。我们可以轻松写出一个支持非阻塞延时、并通过 Ctrl+C 信号安全、优雅退出的并发模型。

    auto shutdown_monitor(IOContext& context) -> Task<void>
    {
        SignalSet sets{ context, signals::interrupt, signals::terminate };
    
        // 挂起协程,等待操作系统向底层派发 SIGINT 或 SIGTERM
        co_await sets.async_wait();
    
        spdlog::info("Received shutdown signal, stopping IOContext...");
        context.stop();
    }
    
    auto demo(IOContext& context) -> Task<void>
    {
        using namespace std::chrono_literals;
        spdlog::info("before sleep...");    
        
        // 模拟长耗时异步任务
        co_await sleep_for(context, 10min);
    
        spdlog::info("after sleep...");
    }
    
    int main(int argc, char* argv[])
    {
        IOContext context{};
    
        // 并发派发两个独立协程:一个执行业务,一个负责监听中断
        co_spawn(context, demo(context));
        co_spawn(context, shutdown_monitor(context));
    
        context.run();
    
        spdlog::info("IOContext stopped, exiting...");
        return EXIT_SUCCESS;
    }
    

    运行结果:
    在长达 10 分钟的 sleep 任务途中,我们通过 Ctrl+C 触发键盘中断,signalfd 捕获到信号,唤醒了沉睡的 shutdown_monitor 协程,随后成功中断事件循环,程序安全退出。

    blog.io_context_v2
    [2026-04-19 22:33:21.313] [info] before sleep...
    ^C[2026-04-19 22:33:22.954] [info] Received shutdown signal, stopping IOContext...
    [2026-04-19 22:33:22.954] [info] IOContext stopped, exiting...
    

    完整代码

    1 条回复 最后回复
    0

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组