【 基于 io_uring 的 C++20 协程网络库】02:模块解耦与完备的退出机制
-
在上文中,我们构建了
IOContext的核心事件循环骨架。然而,随着组件的增加,我们需要解决两个实际的工程问题:一是如何对底层上下文进行合理的抽象与解耦;
二是如何优雅、无阻塞地处理外部中断信号并终止事件循环。
1. 模块解耦与泛型化设计
考虑到未来我们可能会迭代出多个版本的
IOContext,为了最大化代码复用,将具体的协程 Awaiter(如SleepAwaiter)与底层的IOContext实现解耦是必要的。首先,我们将
Operation接口提取到独立的头文件中。其次,对于
SleepAwaiter,由于它依赖Context::sqe(),若在头文件中硬编码IOContext,必须要立刻知道IOContext的定义。因此,我们采用模板化设计,将Context泛型化,在调用点推导出确切的类型。template<typename Context> class SleepAwaiter: public Operation { public: template<chrono_duration Duration> SleepAwaiter(Context& context, Duration d) : context_{ context } { using namespace std::chrono; // 转换 std::chrono 时间为内核认识的 timespec timeout_.tv_sec = duration_cast<seconds>(d).count(); timeout_.tv_nsec = duration_cast<nanoseconds>(d % seconds(1)).count(); } [[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); // 提交纯超时指令,count 设为 0 表示只受时间触发 ::io_uring_prep_timeout(sqe, &timeout_, 0, 0); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<void, std::error_code> { // io_uring 中,超时正常结束会返回 ETIME if (error_code_ == ETIME || error_code_ == 0) return {}; // 其他错误(如 ECANCELED 被提前强杀) return unexpected_system_error(error_code_); } void complete(int res, std::uint32_t flags) noexcept override { error_code_ = -res; if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } private: Context& context_; struct __kernel_timespec timeout_{}; std::coroutine_handle<> handle_{ nullptr }; int error_code_{ 0 }; }; template<typename Context, typename Duration> auto sleep_for(Context& context, Duration duration) noexcept -> SleepAwaiter<Context> { return SleepAwaiter<Context>{ context, duration }; }设计注记:
通常情况下,过度泛型化(滥用模板)会劣化编译时长,并不值得推崇。但在基础设施库的设计中,静态多态(基于模板的 Duck Typing)能够做到零运行时开销(Zero-overhead),且调用方代码无需任何修改即可适配不同版本的IOContext,这种妥协是极具工程价值的。2. 完善事件循环的终止机制 (eventfd)
上个版本中,我们使用
std::atomic<bool> should_stop_标志来控制循环退出。但这存在一个死锁隐患:如果IOContext::run()正阻塞在io_uring_submit_and_wait系统调用上,单纯修改布尔变量是无法唤醒内核态线程的。我们需要一种跨越内核与用户态的唤醒机制。在传统的 Reactor 模式中,通常采用管道(pipe)或
eventfd,在io_uring体系下,eventfd依然是开销极小且最适用的方案。核心状态重构:区分系统事件与业务逻辑
在引入
eventfd后,完成队列(CQ)中不仅会包含业务逻辑的事件(如网络 I/O、定时器),还会混入我们内部触发的 wakeup 事件。
这就要求我们必须在状态追踪上做出严格区分:-
count:追踪当前批次取出的所有 CQE 数量,用于向前推进内核的共享环形缓冲区(io_uring_cq_advance)。 -
workdone:追踪实际完成的业务任务数量,仅针对这些任务去扣减outstanding_works_。如果不对二者加以区分,wakeup 信号会导致业务计数器异常递减,引发程序提前退出或触发断言失败。
同时,我们利用 liburing 原生的内联辅助函数
io_uring_cqe_get_data64、io_uring_cqe_get_data以及io_uring_sqe_set_data64来取代底层的直接字段访问,这消除了reinterpret_cast的滥用,确保了类型安全的边界。完整的
IOContext实现如下:#include <sys/eventfd.h> #include <unistd.h> #include <limits> #include <cassert> class IOContext { public: explicit IOContext(unsigned entries = 1024) { if (auto res = ::io_uring_queue_init(entries, &ring_, 0); res < 0) throw_system_error(-res, "io_uring_queue_init"); wakeup_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (wakeup_fd_ == -1) throw_system_error("Failed to create eventfd for stopping IOContext"); arm_wakeup(); } IOContext(const IOContext&) = delete; auto operator=(const IOContext&) -> IOContext& = delete; // 为了简化实现,我们不支持移动 IOContext(IOContext&& other) noexcept = delete; auto operator=(IOContext&&) -> IOContext& = delete; ~IOContext() { ::io_uring_queue_exit(&ring_); ::close(wakeup_fd_); } void run() { ::io_uring_cqe* cqe{ nullptr }; while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0) { auto res = ::io_uring_submit_and_wait(&ring_, 1); if (res < 0) { if (res == -EINTR) continue; throw_system_error("io_uring_submit_and_wait"); } unsigned head; unsigned count{ 0 }; unsigned workdone{ 0 }; io_uring_for_each_cqe(&ring_, head, cqe) { ++count; // 探测到内部唤醒信号 if (io_uring_cqe_get_data64(cqe) == WAKEUP_MARKER) { resume_wakeup(); arm_wakeup(); continue; } // 正常的业务逻辑完成事件 if (io_uring_cqe_get_data64(cqe) != 0) { auto* op = static_cast<Operation*>(io_uring_cqe_get_data(cqe)); op->complete(cqe->res, cqe->flags); ++workdone; } } // 推进环形缓冲区必须使用总事件数 count if (count > 0) ::io_uring_cq_advance(&ring_, count); // 扣减未决任务必须使用实际完成的业务数 workdone if (workdone > 0) outstanding_works_ -= workdone; } } [[nodiscard]] auto sqe() -> ::io_uring_sqe* { auto* sqe = ::io_uring_get_sqe(&ring_); if (!sqe) throw_system_error("io_uring_get_sqe"); add_work(); return sqe; } void stop() { should_stop_.store(true, std::memory_order_relaxed); wakeup(); } auto ring() noexcept -> ::io_uring* { return &ring_; } auto ring() const noexcept -> const ::io_uring* { return &ring_; } void add_work() noexcept { ++outstanding_works_; } void drop_work() noexcept { assert(outstanding_works_ > 0); --outstanding_works_; } private: static constexpr auto WAKEUP_MARKER = std::numeric_limits<std::uintptr_t>::max(); ::io_uring ring_{}; int wakeup_fd_{ -1 }; // 只用来追踪 io_context 之外的操作,并不需要用户主动来使用相关的接口 std::size_t outstanding_works_{ 0 }; // stop 会被跨线程调用,所以需要使用原子变量来保证线程安全 std::atomic<bool> should_stop_{ false }; void arm_wakeup() noexcept { auto* sqe = ::io_uring_get_sqe(&ring_); if (!sqe) throw_system_error("io_uring_get_sqe failed when re-arming wakeup"); ::io_uring_prep_poll_add(sqe, wakeup_fd_, POLLIN); // 采用 64 位专有 setter,避免指针转换警告 ::io_uring_sqe_set_data64(sqe, WAKEUP_MARKER); } void wakeup() { std::uint64_t val = 1; ::write(wakeup_fd_, &val, sizeof(val)); } void resume_wakeup() { uint64_t val; ::read(wakeup_fd_, &val, sizeof(val)); } };关于未处理完成的 CQE 的处置:
触发stop()退出循环后,队列中如果还有积压的 CQE 怎么办?这正是 RAII 管理机制的优势所在。
IOContext的生命周期与系统资源严格绑定,当程序退出,IOContext析构时,io_uring_queue_exit会协同内核彻底销毁共享的环形缓冲区。由于事件循环已经终止,不会再有新的逻辑被触发,因此忽略未决的 CQE 是安全且合理的策略。如果用户确实有在停止后清理特定状态的需求,可以通过暴露的
ring()接口自行干预。3. 基于 signalfd 的统一中断处理
既然已经实现了安全的唤醒与停止语义,顺理成章地,我们应将操作系统的信号(如
SIGINT,SIGTERM)也纳入异步框架。在 Linux 平台上,signalfd提供了一种将异步中断转化为文件描述符可读事件的机制,它能被完美地集成进io_uring轮询模型中。3.1 强类型 Signal 封装
避免裸露的魔术整数:
class Signal { public: explicit constexpr Signal(int signal) noexcept : signal_{ signal } {} auto operator==(const Signal&) const noexcept -> bool = default; // 这里的隐式转换是否提供看个人,我觉得不提供更好 constexpr operator int() const noexcept { return signal_; } [[nodiscard]] constexpr auto value() const noexcept { return signal_; } private: int signal_; }; struct signals { signals() = delete; static constexpr auto interrupt = Signal{ SIGINT }; static constexpr auto terminate = Signal{ SIGTERM }; static constexpr auto quit = Signal{ SIGQUIT }; static constexpr auto hangup = Signal{ SIGHUP }; };3.2 信号集 SignalSet 管理
使用折叠表达式优雅地处理变参掩码。另外,为保障跨线程时的健壮性,此处采用了标准的
pthread_sigmask。#include <sys/signalfd.h> #include <signal.h> template<typename Context> class SignalSet { public: template<typename... Signals> requires (std::same_as<Signals, Signal> && ...) SignalSet(Context& io_context, Signals... signals) : io_context_{ io_context } { ::sigemptyset(&mask_); (::sigaddset(&mask_, signals.value()), ...); // 屏蔽这些信号的默认异步行为,交由 signalfd 同步读取 if (::pthread_sigmask(SIG_BLOCK, &mask_, nullptr) == -1) throw_system_error("Failed to block signals"); fd_ = ::signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC); if (fd_ == -1) throw_system_error("Failed to create signalfd"); } SignalSet(const SignalSet&) = delete; auto operator=(const SignalSet&) -> SignalSet& = delete; SignalSet(SignalSet&& other) noexcept : io_context_{ other.io_context_ }, fd_{ std::exchange(other.fd_, -1) }, mask_{ other.mask_ } {} auto operator=(SignalSet&& other) noexcept -> SignalSet& = delete; ~SignalSet() { if (fd_ != -1) ::close(fd_); } private: Context& io_context_; int fd_{ -1 }; sigset_t mask_; };3.3 构建 PollAwaiter 与 async_wait
既然是协程库,那我们理所应当的应该将监听行为设计成协程。为监听可读事件提供通用的
PollAwaiter:template<typename Context> class PollAwaiter : public Operation { public: using resume_type = void; PollAwaiter(Context& context, int fd, short events) noexcept : context_{ context }, fd_{ fd }, events_{ events } {} [[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; } auto await_suspend(std::coroutine_handle<> handle) noexcept -> void { handle_ = handle; auto* sqe = context_.sqe(); ::io_uring_prep_poll_add(sqe, fd_, events_); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() const noexcept -> std::expected<void, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return {}; } void complete(int res, [[maybe_unused]] std::uint32_t flags) noexcept override { error_code_ = res < 0 ? -res : 0; if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } private: Context& context_; int fd_; short events_; std::coroutine_handle<> handle_{ nullptr }; int error_code_{ 0 }; }; // 在 SignalSet 外部实现 template<typename Context> auto SignalSet<Context>::async_wait() noexcept -> PollAwaiter<Context> { return PollAwaiter<Context>{ io_context_, fd_, POLLIN }; }4. 系统集成演示
至此,基础组件均已就位。我们可以轻松写出一个支持非阻塞延时、并通过
Ctrl+C信号安全、优雅退出的并发模型。auto shutdown_monitor(IOContext& context) -> Task<void> { SignalSet sets{ context, signals::interrupt, signals::terminate }; // 挂起协程,等待操作系统向底层派发 SIGINT 或 SIGTERM co_await sets.async_wait(); spdlog::info("Received shutdown signal, stopping IOContext..."); context.stop(); } auto demo(IOContext& context) -> Task<void> { using namespace std::chrono_literals; spdlog::info("before sleep..."); // 模拟长耗时异步任务 co_await sleep_for(context, 10min); spdlog::info("after sleep..."); } int main(int argc, char* argv[]) { IOContext context{}; // 并发派发两个独立协程:一个执行业务,一个负责监听中断 co_spawn(context, demo(context)); co_spawn(context, shutdown_monitor(context)); context.run(); spdlog::info("IOContext stopped, exiting..."); return EXIT_SUCCESS; }运行结果:
在长达 10 分钟的sleep任务途中,我们通过Ctrl+C触发键盘中断,signalfd捕获到信号,唤醒了沉睡的shutdown_monitor协程,随后成功中断事件循环,程序安全退出。blog.io_context_v2 [2026-04-19 22:33:21.313] [info] before sleep... ^C[2026-04-19 22:33:22.954] [info] Received shutdown signal, stopping IOContext... [2026-04-19 22:33:22.954] [info] IOContext stopped, exiting... -