在上文中,我们构建了 IOContext 的核心事件循环骨架。然而,随着组件的增加,我们需要解决两个实际的工程问题:
一是如何对底层上下文进行合理的抽象与解耦;
二是如何优雅、无阻塞地处理外部中断信号并终止事件循环。
1. 模块解耦与泛型化设计
考虑到未来我们可能会迭代出多个版本的 IOContext,为了最大化代码复用,将具体的协程 Awaiter(如 SleepAwaiter)与底层的 IOContext 实现解耦是必要的。
首先,我们将 Operation 接口提取到独立的头文件中。
其次,对于 SleepAwaiter,由于它依赖 Context::sqe(),若在头文件中硬编码 IOContext,必须要立刻知道IOContext的定义。因此,我们采用模板化设计,将 Context 泛型化,在调用点推导出确切的类型。
template<typename Context>
class SleepAwaiter: public Operation {
public:
template<chrono_duration Duration>
SleepAwaiter(Context& context, Duration d)
: context_{ context }
{
using namespace std::chrono;
// 转换 std::chrono 时间为内核认识的 timespec
timeout_.tv_sec = duration_cast<seconds>(d).count();
timeout_.tv_nsec = duration_cast<nanoseconds>(d % seconds(1)).count();
}
[[nodiscard]]
constexpr auto await_ready() const noexcept -> bool
{
return false;
}
void await_suspend(std::coroutine_handle<> handle) noexcept
{
handle_ = handle;
auto* sqe = context_.sqe();
// 提交纯超时指令,count 设为 0 表示只受时间触发
::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
::io_uring_sqe_set_data(sqe, this);
}
auto await_resume() noexcept -> std::expected<void, std::error_code>
{
// io_uring 中,超时正常结束会返回 ETIME
if (error_code_ == ETIME || error_code_ == 0)
return {};
// 其他错误(如 ECANCELED 被提前强杀)
return unexpected_system_error(error_code_);
}
void complete(int res, std::uint32_t flags) noexcept override
{
error_code_ = -res;
if (handle_) {
auto handle = std::exchange(handle_, nullptr);
handle.resume();
}
}
private:
Context& context_;
struct __kernel_timespec timeout_{};
std::coroutine_handle<> handle_{ nullptr };
int error_code_{ 0 };
};
template<typename Context, typename Duration>
auto sleep_for(Context& context, Duration duration) noexcept -> SleepAwaiter<Context>
{
return SleepAwaiter<Context>{ context, duration };
}
设计注记:
通常情况下,过度泛型化(滥用模板)会劣化编译时长,并不值得推崇。但在基础设施库的设计中,静态多态(基于模板的 Duck Typing)能够做到零运行时开销(Zero-overhead),且调用方代码无需任何修改即可适配不同版本的 IOContext,这种妥协是极具工程价值的。
2. 完善事件循环的终止机制 (eventfd)
上个版本中,我们使用 std::atomic<bool> should_stop_ 标志来控制循环退出。但这存在一个死锁隐患:如果 IOContext::run() 正阻塞在 io_uring_submit_and_wait 系统调用上,单纯修改布尔变量是无法唤醒内核态线程的。
我们需要一种跨越内核与用户态的唤醒机制。在传统的 Reactor 模式中,通常采用管道(pipe)或 eventfd,在 io_uring 体系下,eventfd 依然是开销极小且最适用的方案。
核心状态重构:区分系统事件与业务逻辑
在引入 eventfd 后,完成队列(CQ)中不仅会包含业务逻辑的事件(如网络 I/O、定时器),还会混入我们内部触发的 wakeup 事件。
这就要求我们必须在状态追踪上做出严格区分:
count:追踪当前批次取出的所有 CQE 数量,用于向前推进内核的共享环形缓冲区(io_uring_cq_advance)。
workdone:追踪实际完成的业务任务数量,仅针对这些任务去扣减 outstanding_works_。如果不对二者加以区分,wakeup 信号会导致业务计数器异常递减,引发程序提前退出或触发断言失败。
同时,我们利用 liburing 原生的内联辅助函数 io_uring_cqe_get_data64、io_uring_cqe_get_data 以及 io_uring_sqe_set_data64 来取代底层的直接字段访问,这消除了 reinterpret_cast 的滥用,确保了类型安全的边界。
完整的 IOContext 实现如下:
#include <sys/eventfd.h>
#include <unistd.h>
#include <limits>
#include <cassert>
class IOContext {
public:
explicit IOContext(unsigned entries = 1024)
{
if (auto res = ::io_uring_queue_init(entries, &ring_, 0); res < 0)
throw_system_error(-res, "io_uring_queue_init");
wakeup_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (wakeup_fd_ == -1)
throw_system_error("Failed to create eventfd for stopping IOContext");
arm_wakeup();
}
IOContext(const IOContext&) = delete;
auto operator=(const IOContext&) -> IOContext& = delete;
// 为了简化实现,我们不支持移动
IOContext(IOContext&& other) noexcept = delete;
auto operator=(IOContext&&) -> IOContext& = delete;
~IOContext()
{
::io_uring_queue_exit(&ring_);
::close(wakeup_fd_);
}
void run()
{
::io_uring_cqe* cqe{ nullptr };
while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0)
{
auto res = ::io_uring_submit_and_wait(&ring_, 1);
if (res < 0) {
if (res == -EINTR)
continue;
throw_system_error("io_uring_submit_and_wait");
}
unsigned head;
unsigned count{ 0 };
unsigned workdone{ 0 };
io_uring_for_each_cqe(&ring_, head, cqe) {
++count;
// 探测到内部唤醒信号
if (io_uring_cqe_get_data64(cqe) == WAKEUP_MARKER) {
resume_wakeup();
arm_wakeup();
continue;
}
// 正常的业务逻辑完成事件
if (io_uring_cqe_get_data64(cqe) != 0) {
auto* op = static_cast<Operation*>(io_uring_cqe_get_data(cqe));
op->complete(cqe->res, cqe->flags);
++workdone;
}
}
// 推进环形缓冲区必须使用总事件数 count
if (count > 0)
::io_uring_cq_advance(&ring_, count);
// 扣减未决任务必须使用实际完成的业务数 workdone
if (workdone > 0)
outstanding_works_ -= workdone;
}
}
[[nodiscard]]
auto sqe() -> ::io_uring_sqe*
{
auto* sqe = ::io_uring_get_sqe(&ring_);
if (!sqe)
throw_system_error("io_uring_get_sqe");
add_work();
return sqe;
}
void stop()
{
should_stop_.store(true, std::memory_order_relaxed);
wakeup();
}
auto ring() noexcept -> ::io_uring* { return &ring_; }
auto ring() const noexcept -> const ::io_uring* { return &ring_; }
void add_work() noexcept { ++outstanding_works_; }
void drop_work() noexcept
{
assert(outstanding_works_ > 0);
--outstanding_works_;
}
private:
static constexpr auto WAKEUP_MARKER = std::numeric_limits<std::uintptr_t>::max();
::io_uring ring_{};
int wakeup_fd_{ -1 };
// 只用来追踪 io_context 之外的操作,并不需要用户主动来使用相关的接口
std::size_t outstanding_works_{ 0 };
// stop 会被跨线程调用,所以需要使用原子变量来保证线程安全
std::atomic<bool> should_stop_{ false };
void arm_wakeup() noexcept
{
auto* sqe = ::io_uring_get_sqe(&ring_);
if (!sqe)
throw_system_error("io_uring_get_sqe failed when re-arming wakeup");
::io_uring_prep_poll_add(sqe, wakeup_fd_, POLLIN);
// 采用 64 位专有 setter,避免指针转换警告
::io_uring_sqe_set_data64(sqe, WAKEUP_MARKER);
}
void wakeup()
{
std::uint64_t val = 1;
::write(wakeup_fd_, &val, sizeof(val));
}
void resume_wakeup()
{
uint64_t val;
::read(wakeup_fd_, &val, sizeof(val));
}
};
关于未处理完成的 CQE 的处置:
触发 stop() 退出循环后,队列中如果还有积压的 CQE 怎么办?
这正是 RAII 管理机制的优势所在。IOContext 的生命周期与系统资源严格绑定,当程序退出,IOContext 析构时,io_uring_queue_exit 会协同内核彻底销毁共享的环形缓冲区。由于事件循环已经终止,不会再有新的逻辑被触发,因此忽略未决的 CQE 是安全且合理的策略。
如果用户确实有在停止后清理特定状态的需求,可以通过暴露的 ring() 接口自行干预。
3. 基于 signalfd 的统一中断处理
既然已经实现了安全的唤醒与停止语义,顺理成章地,我们应将操作系统的信号(如 SIGINT, SIGTERM)也纳入异步框架。在 Linux 平台上,signalfd 提供了一种将异步中断转化为文件描述符可读事件的机制,它能被完美地集成进 io_uring 轮询模型中。
3.1 强类型 Signal 封装
避免裸露的魔术整数:
class Signal {
public:
explicit constexpr Signal(int signal) noexcept
: signal_{ signal }
{}
auto operator==(const Signal&) const noexcept -> bool = default;
// 这里的隐式转换是否提供看个人,我觉得不提供更好
constexpr operator int() const noexcept { return signal_; }
[[nodiscard]] constexpr auto value() const noexcept { return signal_; }
private:
int signal_;
};
struct signals {
signals() = delete;
static constexpr auto interrupt = Signal{ SIGINT };
static constexpr auto terminate = Signal{ SIGTERM };
static constexpr auto quit = Signal{ SIGQUIT };
static constexpr auto hangup = Signal{ SIGHUP };
};
3.2 信号集 SignalSet 管理
使用折叠表达式优雅地处理变参掩码。另外,为保障跨线程时的健壮性,此处采用了标准的 pthread_sigmask。
#include <sys/signalfd.h>
#include <signal.h>
template<typename Context>
class SignalSet {
public:
template<typename... Signals>
requires (std::same_as<Signals, Signal> && ...)
SignalSet(Context& io_context, Signals... signals)
: io_context_{ io_context }
{
::sigemptyset(&mask_);
(::sigaddset(&mask_, signals.value()), ...);
// 屏蔽这些信号的默认异步行为,交由 signalfd 同步读取
if (::pthread_sigmask(SIG_BLOCK, &mask_, nullptr) == -1)
throw_system_error("Failed to block signals");
fd_ = ::signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC);
if (fd_ == -1)
throw_system_error("Failed to create signalfd");
}
SignalSet(const SignalSet&) = delete;
auto operator=(const SignalSet&) -> SignalSet& = delete;
SignalSet(SignalSet&& other) noexcept
: io_context_{ other.io_context_ },
fd_{ std::exchange(other.fd_, -1) },
mask_{ other.mask_ }
{}
auto operator=(SignalSet&& other) noexcept -> SignalSet& = delete;
~SignalSet()
{
if (fd_ != -1)
::close(fd_);
}
private:
Context& io_context_;
int fd_{ -1 };
sigset_t mask_;
};
3.3 构建 PollAwaiter 与 async_wait
既然是协程库,那我们理所应当的应该将监听行为设计成协程。为监听可读事件提供通用的 PollAwaiter:
template<typename Context>
class PollAwaiter : public Operation {
public:
using resume_type = void;
PollAwaiter(Context& context, int fd, short events) noexcept
: context_{ context },
fd_{ fd },
events_{ events }
{}
[[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; }
auto await_suspend(std::coroutine_handle<> handle) noexcept -> void
{
handle_ = handle;
auto* sqe = context_.sqe();
::io_uring_prep_poll_add(sqe, fd_, events_);
::io_uring_sqe_set_data(sqe, this);
}
auto await_resume() const noexcept -> std::expected<void, std::error_code>
{
if (error_code_ != 0)
return unexpected_system_error(error_code_);
return {};
}
void complete(int res, [[maybe_unused]] std::uint32_t flags) noexcept override
{
error_code_ = res < 0 ? -res : 0;
if (handle_) {
auto handle = std::exchange(handle_, nullptr);
handle.resume();
}
}
private:
Context& context_;
int fd_;
short events_;
std::coroutine_handle<> handle_{ nullptr };
int error_code_{ 0 };
};
// 在 SignalSet 外部实现
template<typename Context>
auto SignalSet<Context>::async_wait() noexcept -> PollAwaiter<Context>
{
return PollAwaiter<Context>{ io_context_, fd_, POLLIN };
}
4. 系统集成演示
至此,基础组件均已就位。我们可以轻松写出一个支持非阻塞延时、并通过 Ctrl+C 信号安全、优雅退出的并发模型。
auto shutdown_monitor(IOContext& context) -> Task<void>
{
SignalSet sets{ context, signals::interrupt, signals::terminate };
// 挂起协程,等待操作系统向底层派发 SIGINT 或 SIGTERM
co_await sets.async_wait();
spdlog::info("Received shutdown signal, stopping IOContext...");
context.stop();
}
auto demo(IOContext& context) -> Task<void>
{
using namespace std::chrono_literals;
spdlog::info("before sleep...");
// 模拟长耗时异步任务
co_await sleep_for(context, 10min);
spdlog::info("after sleep...");
}
int main(int argc, char* argv[])
{
IOContext context{};
// 并发派发两个独立协程:一个执行业务,一个负责监听中断
co_spawn(context, demo(context));
co_spawn(context, shutdown_monitor(context));
context.run();
spdlog::info("IOContext stopped, exiting...");
return EXIT_SUCCESS;
}
运行结果:
在长达 10 分钟的 sleep 任务途中,我们通过 Ctrl+C 触发键盘中断,signalfd 捕获到信号,唤醒了沉睡的 shutdown_monitor 协程,随后成功中断事件循环,程序安全退出。
blog.io_context_v2
[2026-04-19 22:33:21.313] [info] before sleep...
^C[2026-04-19 22:33:22.954] [info] Received shutdown signal, stopping IOContext...
[2026-04-19 22:33:22.954] [info] IOContext stopped, exiting...
完整代码