@SPeak 以后可能会独立出一个用module实现的库
Doomjustin
-
【 基于 io_uring 的 C++20 协程网络库】10 API重构:让用户不再关心IOContext -
【 基于 io_uring 的 C++20 协程网络库】10 API重构:让用户不再关心IOContext前几篇一路写下来,我们已经把
io_uring的关键能力都接进了库:awaiter、timeout、socket、acceptor、recv_multishot、ReceiveStream。功能越来越完整,但也暴露出一个更现实的问题:API 是否还在逼用户理解本不该由他承担的运行时细节。最典型的细节,就是
IOContext。用户似乎不太需要理解IOContext的概念。这里给出一个对比
// 以前:业务函数签名一路携带 IOContext auto server(async::IOContext& context) -> async::Task<> { auto endpoint = net::ip::tcp::endpoint{ net::ip::AddressV6::loopback(), 12345 }; auto acceptor = net::ip::tcp::acceptor{ endpoint, true, context }; while (true) { auto client = co_await acceptor.async_accept(); if (!client) continue; async::co_spawn(session(std::move(*client), context)); } } // 现在:业务代码只表达连接逻辑,context 退回运行时内部 auto server() -> async::Task<> { auto endpoint = net::ip::tcp::endpoint{ net::ip::AddressV6::loopback(), 12345 }; auto acceptor = net::ip::tcp::acceptor{ endpoint, true }; while (true) { auto client = co_await acceptor.async_accept(); if (!client) continue; async::co_spawn(session(std::move(*client))); } }IOContext在框架内部当然是核心,事件循环、SQE/CQE 调度、work 计数都离不开它。真正让人犹豫的是另一件事: 用户代码里要不要处处显式传IOContext&。一开始看,显式传参很"透明",但写着写着就会发现它并没有给业务层带来真正自由。我们想支持的并发模型其实从一开始就是确定的:每个线程一个 context,每个线程各自跑事件循环,线程数量由启动参数决定。既然目标范式是固定的,继续把 context 暴露成日常 API,用户就很容易产生错觉,以为自己在做线程编排,实际上只是重复一个框架本可以帮他完成的路径。
Asio 和 io_uring-cpp 之类的库之所以把
io_context/io_uring_context作为显式参数暴露出来,是因为它们需要支持更复杂的并发拓扑——同一进程里可以有多个独立的 context,同一个 socket 可以在不同 context 之间迁移,线程与 context 的对应关系可以由用户自由配置。为了支持这种自由度,context 就必须出现在每一个需要它的地方:socket 构造时要绑定 context,每次异步操作要知道往哪个 context 提交 SQE。这个代价不是设计失误,而是为了满足通用性的合理取舍。我们的情况不同。这个库从一开始就只打算支持一种并发模型:每线程一个 context,所有线程跑同样的协程入口,由
async::run统一管理。这个约束是主动选择的,不是偷懒,而是认为对于绝大多数网络服务场景,这个模型就已经够用,而且更难被误用。正因为并发模型是确定的,context 就可以从参数里消失,退回到线程局部存储里待命。这个选择当然有代价。隐藏 context 之后,用户也就失去了"把这个连接交给某个特定线程处理"的能力。当前的连接分发完全依赖内核的
SO_REUSEPORT——多个线程各自 accept,内核负责把新连接均匀分配过去。如果业务需要最少连接数、一致性哈希这类应用层调度策略,默认路径就不够用了。但这不是无路可走。需要自定义负载均衡的用户,可以绕过
async::run的封装,直接操作IOContext——手动创建线程、绑定 context、管理连接归属。这条路更复杂,要求用户真正理解 context 的生命周期和调度语义,但接口是开放的。默认隐藏 context,不是在堵死这条路,而是认为大多数场景不需要走它。所以这轮重构想解决的,是让用户不再需要关心
IOContext。框架内部继续依赖它,只是不再要求用户把它带进日常代码里。入口上,这个意图非常直接。
async::run的实现把范式固定得很彻底:template<typename Awaiter, typename... Args> void run(std::integral auto thread_count, Awaiter&& awaiter, Args&&... args) { std::vector<std::jthread> threads; for (int i = 1; i < thread_count; ++i) { threads.emplace_back([awaiter, args...]() mutable -> void { detail::push(this_coroutine::context()); co_spawn(std::invoke(awaiter, args...)); this_coroutine::context().run(); detail::erase(this_coroutine::context()); }); } run(std::forward<Awaiter>(awaiter), std::forward<Args>(args)...); }用户传入的是一个协程函数和它的参数,
run内部完成线程创建、context 绑定、事件循环启动、退出后 context 注销这整条链路。不同线程之间没有共享 context,也不需要用户来决定线程与 context 的对应关系——这条路只有一种走法。
这个模式能成立,依赖的是线程局部 context:auto context() -> IOContext& { thread_local auto context = std::make_unique<IOContext>(); return *context; }第一次访问时创建,之后同线程复用。这样一来,很多接口就可以把 context 作为默认参数内收。比如
StreamSocket、BasicAcceptor:// 之前:context 必须显式传入 BasicAcceptor(Context& context, const endpoint_type& endpoint, bool enable_reuse_port = false); // 现在:context 带了默认值,来自线程局部存储 BasicAcceptor(const endpoint_type& endpoint, bool enable_reuse_port = false, context_type& context = async::this_coroutine::context());StreamSocket的各个构造函数同样如此:explicit StreamSocket(context_type& context = async::this_coroutine::context()); StreamSocket(const Protocol& protocol, context_type& context = async::this_coroutine::context()); StreamSocket(int fd, context_type& context = async::this_coroutine::context());用户代码自然回到"写网络协程"本身,而不是在每层函数签名里搬运
IOContext&。这一点在调用体验上的变化非常明显。过去写 server 逻辑,常常会先铺一层 context plumbing,再进入 accept/read/write;现在可以直接围绕连接、会话、超时去组织代码。
co_spawn(session(...))看起来更轻,不是因为框架放松了约束,而是因为约束被移回了运行时内部。有了这些铺垫,我们可以给出一个完整的 echo server demo。
echo()启动时调用一次setup_buffer_ring,这是给recv_multishot在内核里注册 buffer ring 的初始化步骤,内部同样通过this_coroutine::context()拿到当前线程的 context,用户不需要显式传入。之后就是纯粹的业务逻辑:auto echo() -> async::Task<> { async::setup_buffer_ring(128, 4096); auto endpoint = net::ip::tcp::endpoint{ net::ip::AddressV6::loopback(), 12345 }; auto acceptor = net::ip::tcp::acceptor{ endpoint, true }; net::ip::tcp::endpoint client_endpoint{}; while (true) { auto client = co_await acceptor.async_accept(client_endpoint); if (!client) continue; async::co_spawn(session(std::move(*client))); } } auto shutdown_monitor() -> async::Task<void> { async::SignalSet sets{ async::signals::interrupt, async::signals::terminate }; co_await sets.async_wait(); async::stop(); } int main(int argc, char* argv[]) { auto worker_count = std::thread::hardware_concurrency() == 0 ? 4 : std::thread::hardware_concurrency(); async::co_spawn(shutdown_monitor()); async::run(worker_count, echo); }session、echo、main都不持有IOContext,但调度、超时、收发和停机链路一个都没少。业务层看到的是服务器如何处理连接,而不是运行时如何编排 context。session()更能说明这一点。它直接从 socket 拿receive_stream,在循环里消费数据,写回时套上超时——整个函数签名和函数体里没有 context 参数的任何踪迹:auto session(net::ip::tcp::socket client) -> async::Task<> { auto stream = client.receive_stream(); while (true) { auto read_result = co_await stream.next(); if (!read_result || read_result->data().empty()) co_return; auto received = read_result->data(); using namespace std::literals::chrono_literals; auto write_result = co_await async::timeout(async::write(client, received), 1ms); if (!write_result) co_return; } }这是用户不再需要关心
IOContext之后真正的书写状态:从 accept 到 session、从读取到超时写回,context 在这条路上完全透明。实际跑起来,20 个 worker 线程同时监听,各自持有独立的 context,accept 和 session 由内核按连接分配到不同线程处理:
[2026-04-30 15:59:49.301] [9833] [info] Server listening on ::1:12345 [2026-04-30 15:59:49.301] [9832] [info] Server listening on ::1:12345 [2026-04-30 15:59:49.302] [9835] [info] Server listening on ::1:12345 [2026-04-30 15:59:49.301] [9834] [info] Server listening on ::1:12345 ... [2026-04-30 15:59:49.306] [9850] [info] Server listening on ::1:12345 [2026-04-30 15:59:57.419] [9838] [info] Accepted connection from ::1:52762 [2026-04-30 15:59:58.467] [9838] [info] Received 8 bytes from client 64 [2026-04-30 15:59:58.467] [9838] [info] Data: dasfasf [2026-04-30 15:59:58.980] [9838] [info] Client 64 disconnected [2026-04-30 15:59:59.950] [9849] [info] Accepted connection from ::1:52770 [2026-04-30 16:00:00.568] [9849] [info] Client 64 disconnected [2026-04-30 16:00:01.591] [9841] [info] Accepted connection from ::1:52786 [2026-04-30 16:00:02.043] [9841] [info] Client 64 disconnected日志里每行的线程 ID 不同,但业务代码里根本没有出现过"把这个 session 分配给哪个线程"的逻辑——这正是
SO_REUSEPORT加上每线程独立 context 共同决定的结果,不是用户在代码里手工安排的。DetachedTask::promise_type仍然在内部绑定 context,并在生命周期里维护 work 计数:template<typename Awaitable> promise_type(Awaitable&& awaitable, IOContext& ctx = this_coroutine::context()) : context{ &ctx } { context->add_work(); } ~promise_type() { context->drop_work(); }也就是说,调用侧虽然不再显式传 context,但任务归属、退出条件、调度正确性都还在,而且更一致。
到这里,另一个经常被忽略的问题也一起被收口了: 生命周期结束时,谁来负责停机和取消。
如果 context 长期暴露在业务层,停机往往会变成"每处都能 stop 一下"的分散控制,最后很难判断到底谁在主导退出。现在把入口收敛后,这件事也能回到统一语义。
run在每条线程上都会向detail::active_contexts注册当前 context,退出时再注销;async::stop()遍历这个集合,对每个 context 广播停机:void stop() { std::scoped_lock lock{ detail::contexts_mutex }; std::ranges::for_each(detail::active_contexts, [](IOContext* context) { context->stop(); } ); }正在阻塞在
submit_and_wait的事件循环会被唤醒,检查should_stop_后走退出路径;任务侧依赖 work 计数和协程收尾自然回落。shutdown_monitor()只需要等信号、调一次async::stop(),其余全部由框架收尾。这样 stop/cancel 不再是额外补丁,而是和调度模型放在同一条主线上。用户写网络服务,只需要关心连接、读写和超时,不需要关心背后跑了几个 context、谁在驱动事件循环、停机时谁负责收尾。
IOContext还在,只是不再出现在用户需要阅读和维护的代码里。这一步做完之后,ReceiveStream、timeout、buffer ring 这些能力继续扩展,API 也不会再被迫回到"先传 context,再谈业务"的旧写法。 -
【 基于 io_uring 的 C++20 协程网络库】09 读路径优化:recv_multishot与ReceiveStream在上一篇里,我们解决了"怎么把数据发出去"。这一篇转到读路径:把
recv_multishot接进来,减少高频读取时的重复提交。如果沿用
async_read_some,每次读取都要先准备一块可写 buffer,再提交一次recv,等 CQE 回来后再决定要不要继续下一次。这个模型本身没错,但放到 Echo、代理、网关这类长连接里,会很快变成重复劳动:准备 buffer、提交 SQE、等待 CQE、再补上下一次读取。recv_multishot能直接解决这件事:一次提交,对应后续多次完成。在开始之前,先确认一下当前库的状态。前几篇一路写下来,
IOContext一直是个比较扁平的类:持有一个io_uringring 句柄,提供 SQE 获取和事件循环驱动,加上 work 计数和 stop 信号:class IOContext { ::io_uring ring_; int event_fd_; std::size_t outstanding_works_{ 0 }; std::atomic<bool> should_stop_{ false }; void run(); auto sqe() -> ::io_uring_sqe*; void wakeup(); // CQE 分发 ... };socket 层的每个异步操作,把自己包装成
Operation提交进 ring,CQE 回来时由IOContext分发回去。至于 buffer,一直是由调用方临时传入,操作完成后调用方自己处理生命周期。这个结构目前是干净的,但
recv_multishot要求的东西比这多一层。
1. 直接加进去会碰到什么
recv_multishot需要 buffer ring:内核不再接受调用方临时传入的单块 buffer,而是要求预先注册一组固定大小的槽位。每次有数据到达,内核从 ring 里借一个槽位写入,CQE 里带上这次借出的槽位编号,调用方读完数据后再把槽位还回去。最直接的想法是往
IOContext里加几个字段:class IOContext { ::io_uring ring_; int event_fd_; std::size_t outstanding_works_{ 0 }; std::atomic<bool> should_stop_{ false }; // 新加的 buffer ring 管理 std::unordered_map<unsigned, BufferRing> buffer_rings_; unsigned next_bgid_{ 0 }; std::optional<unsigned> default_bgid_; };功能上能跑,但麻烦随之而来。
第一,
IOContext现在要同时处理两类完全不同的事情:一类是"这一次 submit_and_wait 怎么跑、CQE 怎么分发",另一类是"有哪些 buffer 组被注册在内核里、槽位怎么借出和归还"。这两件事没有天然的耦合关系,混在一起只会让两侧的逻辑都难以单独修改。第二,更直接的问题出在 CQE 分发上。之前"一个 CQE 对应一次操作完成"的判断,在 multishot 里不再成立——事件循环必须识别
IORING_CQE_F_MORE,在这个 flag 还存在时不能把这笔 work 从计数里扣掉。这不是细节调整,而是分发逻辑本身语义的变化。如果这块逻辑和 buffer ring 管理搅在同一段代码里,两边会互相干扰。第三,socket 层如果要用 buffer ring,就必须拿到
bgid和bid,然后到处传。调用方写业务代码时不该感知这些细节,但没有合适的封装层,这些细节就只能往上漏。所以在写 multishot awaiter 之前,要先把
IOContext的职责拆开:- 和 ring 驱动、唤醒、CQE 分发有关的逻辑,收进独立的
Scheduler。 - buffer ring 的建立、槽位借出和归还,收进独立的
BufferRingGroup。 IOContext自己只保留对外协调接口和 work tracking。- socket 层只暴露消费接口,不让业务层直接碰 buffer slot。
这不是为了"代码好看",而是为了把复杂度放回正确位置。不做这一步,后面无论接收流、buffer 池复用还是取消清理,都会越来越难理顺。
2. 重构后的
IOContext拆分后的
IOContext把两类职责分别交给两个内部类:class IOContext { private: class Scheduler { // ring 初始化、SQE 获取、submit_and_wait、wakeup }; class BufferRingGroup { // setup buffer ring、release buffer slot、default bgid }; Scheduler scheduler_; BufferRingGroup buffers_; std::size_t outstanding_works_{ 0 }; std::atomic<bool> should_stop_{ false }; };Scheduler和BufferRingGroup的职责本来就不是一回事。Scheduler关注的是"这一次事件循环怎么跑":- 初始化和销毁
io_uringring。 - 提供 SQE。
submit_and_wait后遍历 CQE,分发给对应Operation。- 通过
eventfd做跨线程 stop 唤醒。
尤其是第 3 点,在引入 multishot 之后已经不能再按"一个 CQE 对应一个完整操作"来理解了。
Scheduler::schedule()现在必须识别IORING_CQE_F_MORE,只有在 multishot 真正结束时才能把这笔 work 从计数里扣掉。这意味着recv_multishot不只是给 socket 层加了个新能力,它也顺手改写了事件循环对"完成"这件事的理解。而
BufferRingGroup关心的是另一件事:"有哪些 buffer 组被长期注册在内核里,以及它们怎么被重复借出和归还"。这部分如果混进Scheduler,后者就会一边跑 CQE 批调度,一边操心 buffer 池资源生命周期,边界很快就会糊掉。接口本身也能看出这种分层:
auto setup_buffer_ring(unsigned entries, unsigned size) -> unsigned { return buffers_.setup(scheduler_.ring(), entries, size); } void release_buffer_ring(unsigned bgid, unsigned bid) { buffers_.release(bgid, bid); }IOContext在这里做的事情很克制:buffer ring 的建立确实需要底层 ring 句柄,但建立完成之后,后续使用者不必再感知这些细节。有了这层结构,
ReceiveStream才真正有了落脚点。否则它一边要操心 multishot 请求,一边还得自己解决 buffer group 的分配和归还,那就不是 socket 接口该承担的事情。它在
StreamSocket上的入口很轻:auto receive_stream() -> ReceiveStream<Context> { auto default_bgid = this->context().default_buffer(); if (!default_bgid) throw std::runtime_error{ "No default buffer ring available for receive stream" }; return ReceiveStream<Context>{ this->context(), this->native_handle(), *default_bgid }; }ReceiveStream不是独立工作的,它默认依赖IOContext里已经准备好的 buffer ring。在当前实现里,第一次setup_buffer_ring()创建出来的组会自动成为默认组,所以 demo 里只做一次初始化就够了。
3. 重构落地:把 buffer ring 收进
IOContextbuffer ring 进入
IOContext之后,下一步才轮到它自己的实现。BufferRingGroup::setup()做了三件事:分配一整块连续内存、向内核注册一个 buf ring、把每个 slot 预先填进 ring。auto IOContext::BufferRingGroup::setup(::io_uring* ring, unsigned entries, unsigned size) -> unsigned { auto bgid = next_bgid_++; auto& buffer_ring = group_[bgid]; buffer_ring.size = size; buffer_ring.entries = entries; buffer_ring.mask = ::io_uring_buf_ring_mask(entries); const auto alloc_size = static_cast<std::size_t>(entries * size); buffer_ring.base_address = memory_resource_->allocate(alloc_size, ALIGNMENT); int res = 0; buffer_ring.buffer = ::io_uring_setup_buf_ring(ring, entries, bgid, 0, &res); auto* base = static_cast<std::byte*>(buffer_ring.base_address); for (unsigned i = 0; i < entries; ++i) ::io_uring_buf_ring_add(buffer_ring.buffer, base + i * size, size, i, buffer_ring.mask, i); ::io_uring_buf_ring_advance(buffer_ring.buffer, entries); buffer_ring.tail = entries; if (!default_buffer_bgid_) default_buffer_bgid_ = bgid; return bgid; }这部分有三个会直接影响行为的选择。
第一,所有 buffer slot 放在一整块连续内存里,而不是单独
new一堆小块。这么做不是图省事,而是因为 buf ring 的典型使用方式本来就是"固定大小、重复借用、按槽位编号回收"。连续布局让bid -> 地址的映射退化成简单的指针偏移,后面 CQE 回来时只要base + bid * size就能找回数据。第二,
bgid的管理被封装在BufferRingGroup内部。调用方只拿到一个逻辑编号,不直接接触底层注册细节;默认组也在这里顺手建立起来,方便 socket 层提供一个无参的receive_stream()。第三,归还路径同样应该放在这里,而不是散在各处调用
io_uring_buf_ring_add。因为释放 slot 本质上是 buffer ring 自己的状态变更,和具体是哪条连接、哪次读操作用过这个 slot 没关系。归还逻辑如下:
void IOContext::BufferRingGroup::release(unsigned bgid, unsigned bid) { auto& buffer_ring = group_[bgid]; auto* base = static_cast<std::byte*>(buffer_ring.base_address); const int offset = buffer_ring.tail & buffer_ring.mask; ::io_uring_buf_ring_add( buffer_ring.buffer, base + bid * buffer_ring.size, buffer_ring.size, bid, buffer_ring.mask, offset ); ::io_uring_buf_ring_advance(buffer_ring.buffer, 1); ++buffer_ring.tail; }一个 slot 被借走时,
bid会随 CQE 一起回来;一个 slot 被归还时,BufferRingGroup只需要根据同样的bid把它重新挂回 ring。这样 buffer 的借出和回收就闭上了环,之后PooledBuffer才有可能用 RAII 去包装它。
4. 地基理顺后,再把 multishot 接上
ReceiveStream真正的提交动作在arm_operation()里:void arm_operation() { if (!operation_) operation_ = new MutishotReceiveOperation{ this, context_, bgid_ }; auto* sqe = context_->sqe(); ::io_uring_prep_recv_multishot(sqe, fd_, nullptr, 0, 0); sqe->flags |= IOSQE_BUFFER_SELECT; sqe->buf_group = bgid_; ::io_uring_sqe_set_data(sqe, static_cast<Operation*>(operation_)); operation_armed_ = true; }这个提交动作有两个关键信息。
第一,
io_uring_prep_recv_multishot只提交一次,但不是只收一次。只要内核认为这个 multishot 请求还有效,后面每次有数据到达,它都可以继续产出新的 CQE。第二,
IOSQE_BUFFER_SELECT告诉内核:接收缓冲区不由这次 SQE 显式给出,而是从buf_group对应的 buffer ring 里挑一个空闲槽位来写。用户态这次不传void* buf,而是预先把一组固定大小的 buffer 注册给内核,后面由内核按需借用。CQE 回来时,
res是本次收到的字节数,flags里则可能带上两个额外信息:IORING_CQE_F_BUFFER:说明这次结果对应某个 buffer ring 里的槽位。IORING_CQE_F_MORE:说明这个 multishot 请求还活着,后面还会继续收。
这两个 flag 基本就是
ReceiveStream最关心的信息:一个告诉它"数据落在哪块 buffer 上",一个告诉它"这条流还要不要继续等下去"。
5. 最后才是用户侧 API:为什么返回
PooledBuffer如果只是为了把数据交给调用方,返回
std::span<std::byte>看起来已经够了。但对ReceiveStream来说,这远远不够,因为 span 只是一段视图,不携带任何归还语义。内核从 buffer ring 里借出的槽位,最终必须还回去,否则 ring 只会越用越少,直到耗尽。这个归还动作不能指望业务层记得手工调用,所以这里必须做成 RAII。
PooledBuffer的职责正是这个:template<typename Context> class PooledBuffer { public: PooledBuffer(Context& context, unsigned bgid, unsigned bid, std::span<std::byte> buffer) : context_{ &context }, bgid_{ bgid }, bid_{ bid }, buffer_{ buffer } {} ~PooledBuffer() { release(); } auto data() const noexcept -> std::span<std::byte> { return buffer_; } private: void release() { if (valid()) context_->release_buffer_ring(bgid_, bid_); } };调用方拿到的是一块"带归还能力的 buffer 句柄"。它可以读这段数据,也可以把这段数据直接交给后续写操作;一旦这个对象离开作用域,对应槽位就自动回到 buffer ring,可供下一次接收复用。
这也是
ReceiveStream最重要的体验差异:上层拿到的是一块已经装好、生命周期清晰、离开作用域后能自动回收的结果,而不是一段裸内存视图。
6.
next()为什么需要 ready queueReceiveStream对外只暴露一个动作:auto next() -> NextAwaiter { return NextAwaiter{ *this }; }next()能不能用得顺,关键在NextAwaiter和ready_results_的配合。class NextAwaiter { public: auto await_ready() const noexcept -> bool { return !stream_.ready_results_.empty(); } void await_suspend(std::coroutine_handle<> handle) noexcept { stream_.handle_ = handle; if (!stream_.operation_armed_) stream_.arm_operation(); } auto await_resume() -> std::expected<PooledBuffer<Context>, std::error_code> { if (stream_.ready_results_.empty()) return unexpected_system_error(std::errc::operation_canceled); auto result = std::move(stream_.ready_results_.front()); stream_.ready_results_.pop_front(); return result; } };这里的
ready_results_不是装饰品,而是必须存在的一层缓冲。recv_multishot有一个天然特征:协程还没来得及再次co_await next(),它就可能已经连续产出了多个 CQE。如果没有队列,后到的结果只能覆盖先到的结果,或者逼着handle_cqe()当场恢复协程并同步消化所有数据,这两种都不对。把这段行为画成时序会直观很多:
[Kernel multishot] [ReceiveStream] [User Coroutine] | | | |-- CQE #1 ---------------->| push ready_results_ | |-- CQE #2 ---------------->| push ready_results_ | |-- CQE #3 ---------------->| push ready_results_ | | | | | |<----------- co_await next() ---| | | pop #1 and resume | | |<----------- co_await next() ---| | | pop #2 and resume |也就是说,
ready_results_不是优化项,而是recv_multishot能以“生产者-消费者节奏解耦”方式暴露给上层 API 的必要条件。用
std::deque<result_type>把结果先存起来,问题就顺了:- CQE 到达时,先转成
PooledBuffer或 error,推进队列。 - 如果此刻真的有协程挂在
next()上,就恢复它。 - 如果协程暂时还没来取,结果就老实待在队列里,等下一次
await_ready()直接命中。
这样一来,
ReceiveStream就有了一点异步生成器的感觉:底层持续产出,上层按自己的节奏消费,中间靠一个轻量队列解耦。
7. CQE 到达后如何变成可消费结果
handle_cqe()做的事情,说到底就是把内核语义翻译成库语义。void handle_cqe(int result, std::uint32_t flags) noexcept { std::optional<PooledBuffer<Context>> pooled_buffer; if (flags & IORING_CQE_F_BUFFER) { const auto bid = static_cast<std::uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT); auto& buffer_ring = context_->buffer_ring(bgid_); auto* base = static_cast<std::byte*>(buffer_ring.base_address); std::span<std::byte> data{ base + bid * buffer_ring.size, static_cast<std::size_t>(result) }; pooled_buffer.emplace(*context_, bgid_, bid, data); } if (result == -ECANCELED) { ready_results_.emplace_back(unexpected_system_error(std::errc::operation_canceled)); } else if (result >= 0) { if (pooled_buffer) ready_results_.emplace_back(std::move(*pooled_buffer)); else ready_results_.emplace_back(PooledBuffer<Context>{}); } else { ready_results_.emplace_back(unexpected_system_error(-result)); } if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } }这一步分三层:
第一层,从
flags里提取bid,再结合bgid_找回 buffer ring,算出这次数据实际落在了哪一段内存上。第二层,把这段内存包装成
PooledBuffer。从这一刻开始,buffer 的归还责任被显式绑定到了对象生命周期上。第三层,把内核返回码整理成
std::expected:正常数据走 value,取消和错误走 error。这样上层的消费方式就统一了。还有一个细节:
result >= 0但没有IORING_CQE_F_BUFFER时,代码会塞一个默认构造的PooledBuffer。这给上层留出了处理空结果的空间,比如把空 buffer 视为连接关闭。这个约定在 demo 里也直接用到了。
8. 真正难的是收尾
ReceiveStream最容易写错的,不是提交 multishot,而是收尾。问题在于:当
ReceiveStream析构时,内核里那笔 multishot 请求不一定已经结束。你当然可以提交一个 cancel SQE,但 cancel 也是异步的,在 cancel 的 CQE 真正回来之前,原来的 multishot CQE 仍然可能再到一次。这个窗口期一旦处理不好,要么 use-after-free,要么 buffer 泄漏。解决办法是:析构时不直接删 operation,而是先
detach()。void destroy() noexcept { if (operation_) { operation_->detach(); auto* sqe = context_->sqe(false); ::io_uring_prep_cancel(sqe, static_cast<Operation*>(operation_), 0); ::io_uring_sqe_set_data(sqe, nullptr); operation_ = nullptr; } if (context_) context_ = nullptr; }detach()的效果是把operation和ReceiveStream本体断开。之后如果晚到的 CQE 还落到这笔 operation 上,complete()会走另一条路径:不再访问已经析构的 stream,而是仅仅把 buffer 槽位补回 buffer ring。void complete(int res, unsigned flags) override { const bool has_more = (flags & IORING_CQE_F_MORE) != 0; if (stream_) { if (!has_more) { stream_->operation_armed_ = false; stream_->operation_ = nullptr; } stream_->handle_cqe(res, flags); } else if (flags & IORING_CQE_F_BUFFER) { // stream 已不存在,只负责把槽位补回 buffer ring // ... } if (!has_more) delete this; }这样一来,
ReceiveStream的析构和 multishot 请求的最终死亡就不再需要严格同步,晚到的 CQE 也不会污染用户态状态,只会做最必要的资源回收。另外,move 构造和 move 赋值里也有一个对应的修正动作:如果 operation 还活着,就把
operation_->stream_改指向新的对象。这保证了ReceiveStream被移动后,后续 CQE 仍然能回到正确实例上。
9. 实际用法
在 demo 里,Echo Server 的 session 代码已经很干净了:
auto session(ip::tcp::socket<IOContext> client) -> Task<> { auto stream = client.receive_stream(); while (true) { auto read_result = co_await stream.next(); if (!read_result) { spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } auto received = read_result->data(); if (received.empty()) { spdlog::info("Client {} disconnected", client.native_handle()); co_return; } auto write_result = co_await timeout(async_write(client, received), 1ms); if (!write_result) co_return; } }这段代码最直观的变化是:读路径里已经看不到"准备 buffer -> 提交 recv -> 处理 buffer 生命周期"这些样板动作了。业务协程眼里只剩一件事:下一块数据什么时候到。
这正是
ReceiveStream的价值。它不是单纯给recv_multishot套了个 awaiter,而是顺手把 buffer ring、生命周期、结果排队和析构期收尾这些麻烦一起包掉了。上层得到的是一个可以连续next()的接收流,底层保留的则是 io_uring 多次投递的吞吐优势。 - 和 ring 驱动、唤醒、CQE 分发有关的逻辑,收进独立的
-
【 基于 io_uring 的 C++20 协程网络库】08 写路径优化:Scatter-Gather与writev在构建了全异步 Echo Server 之后,我们的网络库已经能够处理一来一回的字节流通信。但工程实践中,一个真实的应用往往不会只发送一块连续内存。
考虑一个 HTTP/1.1 响应:响应头(
std::string)和响应体(std::vector<char>)通常分布在两块不相关的内存区域。最朴素的写法是连续调用两次async_write_some,但这意味着两次io_uring提交、两次协程挂起恢复——这在高并发场景下代价不菲。更糟糕的是,两次独立的写操作并非原子的。在 Nagle 算法被关闭的情况下(
TCP_NODELAY),内核可能将头部和体部拆成两个 TCP 报文分别发送,给对端解析器制造不必要的复杂度。本篇将探讨如何通过 Scatter/Gather I/O 机制,在单次系统调用中完成对多块内存的原子性聚合写操作。
1. 问题根源:内存布局与系统调用边界
标准的
write(2)系统调用接受的是一块连续的(void* buf, size_t count)。要发送分散在多处的数据,传统上有两种方案:- 拼接再发送:将所有数据
memcpy到一块连续缓冲区,再调用一次write。代价是额外的内存分配与拷贝,延迟增加,CPU 缓存命中率下降。 - 多次调用:依次对每块数据各调用一次
write。代价是多次用户态
内核态的上下文切换,以及不可避免的时序问题。
POSIX 标准的答案是
writev(2):ssize_t writev(int fd, const struct iovec *iov, int iovcnt);调用方构造一个
iovec数组,每个元素描述一块内存区域,内核在内部完成聚合,对外表现为单次、原子的写操作。这种模式被称为 Gather Write(聚合写),与之对应的readv称为 Scatter Read(分散读),统称 Scatter/Gather I/O。struct iovec { void *iov_base; // 缓冲区起始地址 size_t iov_len; // 缓冲区长度 };
2. Concept 先行:定义 Buffer 序列的语言契约
C++ 标准库没有"一组只读 buffer 的 range"这个抽象,我们用两个 Concept 来定义它:
template<typename T> concept const_buffer = requires(const T& t) { { buffer(t) } -> std::same_as<std::span<const std::byte>>; }; template <typename T> concept sequence_buffer = std::ranges::range<T> && const_buffer<std::ranges::range_reference_t<T>>;std::string、std::vector<char>、std::string_view等连续存储类型本身就满足const_buffer。因此std::vector<std::string>或std::array<std::string_view, N>这类 range 可以直接传给async_write_some,不需要用户手动做任何转换。
3. 异步实现:
WriteSequenceAwaiter的内存安全设计异步化
writev有一个绕不开的内存安全问题:iov数组的生命周期。template<sequence_buffer Buffer> class WriteSequenceAwaiter: public Operation { public: WriteSequenceAwaiter(context_type& context, int socket, const Buffer& buffers) : context_(context), socket_(socket) { iov_.reserve(std::ranges::size(buffers)); for (const auto& chunk : buffers) { auto data = buffer(chunk); iov_.push_back(::iovec{ .iov_base = const_cast<std::byte*>(data.data()), .iov_len = data.size() }); } } void prepare(::io_uring_sqe* sqe) noexcept override { ::io_uring_prep_writev(sqe, socket_, iov_.data(), iov_.size(), 0); } // ... private: std::vector<::iovec> iov_; // ... };SQE 提交后,内核在某个未知的时间点才真正执行 I/O,等待期间
iov数组必须保持有效。WriteSequenceAwaiter的做法是在构造函数中把iovec元数据全部拷贝进iov_成员向量。这里有一个关键的区分:
iovec元数据(指针 + 长度):占用空间极小(每个 16 字节),在构造时拷贝,由 Awaiter 对象自身持有。- 实际的 payload 字节:不拷贝,
iov_base直接指向调用方提供的原始内存。
由于 Awaiter 对象驻留在协程帧中,而协程帧的生命周期与
co_await表达式完全绑定——co_await在整个异步操作期间协程不会销毁帧,因此:iov_向量本身由协程帧持有,直到complete()恢复后才释放——内核读取iov数组时内存有效。- payload 字节由调用方保证在
co_await期间有效,iov_base指针始终合法。
iov_元数据由 Awaiter 自己持有,payload 字节一字节不动——零拷贝,正确性靠协程帧的生命周期保证。
4. 暴露给用户的 API:
async_write_some的重载决议WriteSequenceAwaiter最终通过StreamSocket上的重载函数暴露给用户:// 重载一:单块 buffer auto async_write_some(std::span<const std::byte> buffer) noexcept -> async::WriteSomeAwaiter; // 重载二:buffer 序列 template<sequence_buffer Buffer> auto async_write_some(const Buffer& buffer) noexcept -> async::WriteSequenceAwaiter<Buffer>;两个重载共享同一个函数名,编译器依据参数类型自动选择正确的路径:
- 传入
std::span<const std::byte>(或满足隐式转换的单一连续 range)→ 走WriteSomeAwaiter,底层是io_uring_prep_write - 传入满足
sequence_buffer的 range(如std::vector<std::string_view>)→ 走WriteSequenceAwaiter,底层是io_uring_prep_writev
5. 实战:发送分散的 HTTP 响应
以发送一个 HTTP/1.1 响应为例,头部和体部分别存储在两块独立内存中:
auto send_response(net::ip::tcp::socket& conn) -> async::Task<> { std::string header = "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: 13\r\n" "\r\n"; std::string body = "Hello, World!"; // std::string 直接满足 const_buffer,std::array<std::string, 2> 满足 sequence_buffer std::array parts = { header, body }; // 一次 co_await → 一次 io_uring 提交 → 一次内核 writev auto result = co_await conn.async_write_some(parts); if (!result) { spdlog::warn("send failed: {}", result.error().message()); co_return; } spdlog::info("sent {} bytes", *result); }与朴素的两次
async_write_some调用相比,这里只有一次协程挂起恢复,且头部与体部保证在同一次writev中原子发送——在TCP_NODELAY的场景下尤为重要。
6. "写完为止"与"读完为止":WriteAll 与 ReadAll
async_write_some和async_read_some只保证单次内核调用——它们返回的是"这次实际传输了多少字节",而非"请求的字节是否全部完成"。在流式协议下,这种部分传输(partial I/O)是完全合法的内核行为。"写完为止"在网络编程中足够常见,值得在库层面直接封装。
WriteAllAwaiter和ReadAllAwaiter将"重试直到完成"的逻辑封装在 Awaiter 内部,业务层无需感知部分传输的存在:6.1 WriteAllAwaiter
class WriteAllAwaiter: public CancelableOperation { // ... void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (error_code_ != 0 || buffer_.empty()) resume(handle_, result, flags); // 完成或出错,恢复协程 else arm_write(); // 还有剩余,重新提交 } void arm_write() noexcept { auto* sqe = context_.sqe(); ::io_uring_prep_send(sqe, socket_, buffer_.data(), buffer_.size(), 0); ::io_uring_sqe_set_data(sqe, this); } void set_result(int result, std::uint32_t flags) noexcept { if (result > 0) { bytes_written_ += static_cast<std::size_t>(result); buffer_ = buffer_.subspan(result); // 滑动视图,指向剩余部分 } else if (result == 0) { error_code_ = ECONNABORTED; // 对端关闭连接 } else { error_code_ = -result; } } };buffer_是std::span<const std::byte>,只是视图,不持有数据。每次部分写入后,set_result把视图头部前移;complete()随即检查:还有剩余就调arm_write()重新提交 SQE,写完了或出错了才调resume()恢复协程。这里有个值得留意的细节:重试不能用循环实现。
complete()是内核 CQE 的回调路径,不能在里面等待下一次 I/O 完成——唯一的方式是重新提交一个 SQE,让内核完成后再次回调。这也是arm_write()会在complete()里再次出现的原因。协程从头到尾只挂起一次,中间所有的重试都在回调链里发生,外面看不到任何细节。ReadAllAwaiter与之对称,换成io_uring_prep_recv和可写的std::span<std::byte>。result == 0时返回ECONNABORTED——对端已关闭,已读字节不足期望长度,继续等下去不会再有新数据。6.2 取消机制:为什么不能用 link_timeout?
在第三篇博客中,我们为一次性的 I/O 操作(
async_read_some、async_write_some)实现了超时机制,其底层是 io_uring 的IOSQE_IO_LINK+io_uring_prep_link_timeout方案:将一个 timeout SQE 通过IOSQE_IO_LINK链接到 I/O SQE 后面,内核自动保证"哪个先完成,另一个被取消"。但这一方案对
WriteAllAwaiter/ReadAllAwaiter完全失效,原因在于它们是多次提交的操作——每次部分写入后,complete()回调里会再次调用arm_write()提交新的 SQE。而IOSQE_IO_LINK只能链接相邻的两个 SQE,第一次提交时的链接在第一个 CQE 到达后就已经消耗完毕,无法覆盖后续的重试 SQE。为此,这里引入了一套针对多次提交操作的取消机制,核心是
CancelableOperation里的一个parent指针:struct CancelableOperation : public Operation { CancelableOperation* parent{ nullptr }; void resume(std::coroutine_handle<> handle, int result, std::uint32_t flags) noexcept { if (parent) parent->complete(result, flags); // 路由给包装层 else if (handle) std::exchange(handle, nullptr).resume(); } };CancelableOperation在Operation基础上新增了一个parent指针。当操作被一个超时组合器包裹时,parent被设置为该组合器;否则为nullptr,行为与普通Operation相同。WriteAllAwaiter继承自CancelableOperation,并在complete()中通过resume()而非直接调用handle_.resume()。关键在于resume()只在整个操作最终完成或出错时才被调用——中间每次部分写入完成后,complete()直接调用arm_write()重新提交,不经过 parent。只有当buffer_.empty()(写完)或error_code_ != 0(出错/被取消)时,才通过resume()将结果路由出去。这样 parent 只需处理一次最终事件,而不是每次重试。这段逻辑如果只看文字会比较绕,可以把它压成一个事件时序:
[User Coroutine] [TimeoutCombinator] [io_uring] | | | |-- co_await timeout() -->| | | |-- submit timer SQE ------->| | |-- submit write_all SQE --->| | | | | |<-- CQE(write partial) ---- | (继续 arm_write) | |<-- CQE(write done) ---- ---| (或 error) | |-- cancel(timer) ---------->| | |<-- CQE(timer canceled) ----| |<------------------------| resume once |另一条分支是 timer 先到期:
TimeoutCombinator会反向 cancel 当前飞行中的 write/read SQE,同样等两边 CQE 都收干净后再恢复协程。核心目标只有一个:恢复一次,但把飞行中的请求收尾做完整。有了这个基础,
TimeoutCombinator就可以实现真正的独立定时器了:template<cancelable_operation Awaiter> class TimeoutCombinator: public CancelableOperation { void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; // 独立提交一个定时器 SQE auto* sqe = context().sqe(); ::io_uring_prep_timeout(sqe, &timeout_, 0, 0); ::io_uring_sqe_set_data(sqe, &timer_); // 再提交内层操作(内层操作的 parent 已在构造时设为 this) awaiter_.await_suspend(handle); } void complete(int result, std::uint32_t flags) noexcept override { // 内层操作(某次重试)先完成了——取消定时器 if (state_ == State::Pending) { state_ = State::AwaiterCompleted; auto* sqe = context().sqe(false); ::io_uring_prep_cancel(sqe, &timer_, 0); } if (--pending_cqes_ == 0) std::exchange(handle_, {}).resume(); } void on_timer_completed(int result) noexcept { // 定时器先到——取消内层操作当前正在飞行的 SQE if (state_ == State::Pending) { state_ = State::TimerCompleted; auto* sqe = context().sqe(false); ::io_uring_prep_cancel(sqe, &awaiter_, 0); } if (--pending_cqes_ == 0) std::exchange(handle_, {}).resume(); } };与
TimeoutAwaiter的IOSQE_IO_LINK不同,TimeoutCombinator将定时器 SQE 和内层操作 SQE 独立提交,两者在 io_uring 中是平等的并发请求:- 内层操作先完成:通过
parent->complete()进入TimeoutCombinator::complete(),发出io_uring_prep_cancel取消定时器,等待定时器的 CQE 到达后恢复协程。 - 定时器先到期:
Timer::complete()调用on_timer_completed(),发出io_uring_prep_cancel取消当前正在飞行的awaiter_SQE,等待其 CQE 到达后以timed_out恢复协程。
两种情况都要求
pending_cqes_(初值为 2)减到零才恢复协程——这保证了无论竞争结果如何,所有飞行中的 SQE 的 CQE 最终都被消耗掉,不会残留在完成队列中干扰后续操作。从调用方视角来看,两套机制的接口完全相同:
// async_read_some:单次提交 → 走 TimeoutAwaiter(link_timeout) auto r1 = co_await timeout(socket.async_read_some(buf), 5s); // async_read_all:多次提交 → 走 TimeoutCombinator(独立定时器 + cancel) auto r2 = co_await timeout(socket.async_read_all(buf), 5s);timeout()函数通过两个重载,依据single_shot_only_operation和cancelable_operation两个 Concept 在编译期自动分发,调用方无需关心底层选择了哪套机制。
6.3 为什么不需要序列版 WriteAll
看到这里,你可能会问:我们有
WriteSequenceAwaiter(序列版write_some),是否也需要一个序列版write_all?答案是不需要。
writev的关键语义是原子性:内核保证整个iovec数组作为一个整体提交给协议栈。对于流式套接字(SOCK_STREAM),内核要么接受全部数据进入发送缓冲区,要么在缓冲区不足时只接受一部分。但这里有一个根本性的约束:
writev的部分写入发生后,你无法简单地"重试剩余部分"。每次writev写入 N 字节后,你需要遍历iovec数组,跳过已完整写入的 chunk,并修剪部分写入那个 chunk 的iov_base/iov_len,然后以剩余的iovec子集重新提交:// 如果要实现序列版 write_all,必须处理这种修剪逻辑 void advance(std::size_t n) { while (n > 0 && !iov_.empty()) { if (n >= iov_.front().iov_len) { n -= iov_.front().iov_len; iov_.erase(iov_.begin()); // O(n) erase,或改用 index } else { auto* base = static_cast<char*>(iov_.front().iov_base); iov_.front().iov_base = base + n; iov_.front().iov_len -= n; n = 0; } } }这并非不可实现,但代价是显著的复杂度提升,而实际收益却微乎其微——实践中发送缓冲区充足时
writev极少发生部分写入。更重要的是,
writev的使用场景本身就决定了调用方通常不关心"是否全部写完":当你用writev拼装一个 HTTP 响应时,你的目标是原子地将头部和体部交给内核,至于内核何时真正通过 TCP 发出去,不是这一层要操心的。这与write_all的语义("确保用户层的所有字节都离开应用缓冲区")本质上是两个不同的问题。因此,我们的设计决策是:
场景 API 选择 一次性发送多块分散内存 async_write_some(sequence_buffer)→writev确保单块内存完整写入 async_write_all(span)→ 循环send既要分散又要保证全部写完 希望没有这种需求 - 拼接再发送:将所有数据
-
【 基于 io_uring 的 C++20 协程网络库】07 实现Acceptor在上一篇文章中,我们构建了
StreamSocket,它作为面向连接的流式套接字,完美解决了客户端的主动连接(connect)与边界安全的字节流传输问题。然而,网络通信是双向的。对于服务端而言,我们需要一种截然不同的实体:它不负责读写数据,只负责被动监听并生产新的连接。在 POSIX 网络栈中,这就是监听套接字(Listening Socket)。
本篇文章将探讨服务端组件
BasicAcceptor的设计,并在文章的最后,利用我们迄今为止构建的所有基础设施,跑通一个完整的、零运行时开销的全异步 Echo Server。1. 架构修正:为什么将
bind下沉至BaseSocket?在着手编写
Acceptor之前,我们必须先纠正一个在上一章中犯的经验主义错误。在许多入门教程中,
bind似乎永远是服务端的专利(配合listen使用),而客户端只需要connect。如果按照这个逻辑,bind应该被封装在Acceptor的初始化中。但这在真实的工业级场景中是行不通的,考虑以下场景:
- 多网卡流量隔离: 客户端在
connect之前,常常需要显式bind到特定的源 IP(例如万兆专线网卡)以控制出口路由。 - NAT 穿透与端口复用: 客户端可能需要绑定固定端口来配合 P2P 打洞。
从操作系统的视角看,任何初始状态的文件描述符都具备被
bind的物理能力。因此,在开发Acceptor之前,我们将bind能力从特定组件中剥离,下沉到了最底层的BaseSocket中。2. 核心定位:强类型约束的“连接工厂”
明确了基础设施后,我们来看
BasicAcceptor的定义。它继承自BaseSocket,剥离了一切读写接口,其唯一的职责是作为连接的生产工厂。template<typename Protocol, typename Context> class BasicAcceptor: public BaseSocket<Protocol, Context> { public: // 强制静态推导:产出物必须与协议完全匹配 using socket_type = typename Protocol::template socket<Context>; using endpoint_type = typename Protocol::endpoint; using base_type = BaseSocket<Protocol, Context>; // ... };这里的Type Traits至关重要。
socket_type确保了ip::tcp::acceptor产出的永远是被强类型保护的ip::tcp::socket(即StreamSocket),从编译器层面切断了将 TCP 连接误用为 UDP 套接字的可能。3. 固化初始化序列与 SO_REUSEADDR
服务端的启动逻辑必须遵循严格的物理规约:配置选项 ->
bind->listen。其中最隐蔽的陷阱是SO_REUSEADDR选项的时序问题。当服务端进程崩溃或更新重启时,旧的 TCP 连接可能仍处于
TIME_WAIT状态。此时如果直接bind,内核会抛出EADDRINUSE(Address already in use) 错误。为了提升系统的重启弹性,SO_REUSEADDR必须在bind之前设置。我们在
BasicAcceptor的构造函数中,强制实行了这套安全的初始化流程:using reuse_address = BooleanOption<SOL_SOCKET, SO_REUSEADDR>; using reuse_port = BooleanOption<SOL_SOCKET, SO_REUSEPORT>; BasicAcceptor(Context& context, const endpoint_type& endpoint, bool enable_reuse_port = false) : base_type{ context, endpoint.protocol() } { this->option(reuse_address{ true }); if (enable_reuse_port) this->option(reuse_port{ true }); this->bind(endpoint); listen(MAX_LISTEN_CONNECTIONS); }对于需要横向扩展的多 Worker 进程架构,我们还提供了一个额外的配置项,允许显式开启
SO_REUSEPORT,将底层的连接负载均衡交由 Linux 内核的 TCP/IP 协议栈处理。除此之外,用户也可以通过延迟打开的方式,自己配置相对应的option
auto acceptor = ip::tcp::accptor{ io_contex }; // 手动打开socket acceptor.open(ep.protocol()); // 配置需要的option acceptor.option(reuse_address{ true }); // 注意bind的先后顺序 acceptor.bind(ep); acceptor.listen();4. 异步抽象:获取对端元数据
在很多业务场景(如访问控制、日志审计)中,服务端需要知道新连接的源 IP 和端口。此时,我们需要提供一个接收
endpoint_type引用的async_accept重载版本。为了支持这一特性,我们需要稍微扩展底层的
AcceptAwaiter。内核的accept系统调用要求传入一个sockaddr*指针和一个保存结构体长度的socklen_t*指针作为输入输出(In/Out)参数。template<typename Protocol, typename Context> class AcceptAwaiter: public Operation { public: using socket_type = typename Protocol::template socket<Context>; using endpoint_type = typename Protocol::endpoint; // ... AcceptAwaiter(Context& context, int fd, endpoint_type* peer = nullptr) : context_{ context }, fd_{ fd }, peer_{ peer } { if (peer_) addrlen_ = peer_->capacity(); // 获取底层缓冲区的最大安全容量 } void prepare(::io_uring_sqe* sqe) noexcept override { // addrlen_ 将由内核覆写为实际的地址长度 ::io_uring_prep_accept(sqe, fd_, peer_ ? peer_->data() : nullptr, peer_ ? &addrlen_ : nullptr, 0); } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); // 不要忘了在返回前resize endpoint,虽然在ip::BasicEndpoint中我们使用的定长类型,但别的协议就需要resize了。 // 查看ip::BasicEndpoint的resize实现的话,你会发现那是一个空函数 if (peer_ && result >= 0) peer_->resize(addrlen_); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } // ... private: Context& context_; int fd_; endpoint_type* peer_; socklen_t addrlen_; // ... };借由
Endpoint内部基于sockaddr_storage构建的内存缓冲区,无论对端是 IPv4 还是 IPv6,我们都能安全地承载内核写入的地址信息。至此,
Acceptor可以在协程流中优雅地捕获对端信息:auto async_accept(endpoint_type& endpoint) noexcept -> AcceptAwaiter<Protocol, Context> { return AcceptAwaiter<Protocol, Context>{ context(), native_handle(), &endpoint }; }5. 全异步 Echo Server 实战
基础设施拼图现已全部齐备(
IOContext、Endpoint、StreamSocket、Acceptor、协程调度机制)。是时候用几行极其简练的 C++20 代码,检验这套系统的真实战力了。我们将编写一个全异步的 TCP Echo Server。它包含两个核心的协程流:会话流(Session)与分发流(Echo)。
5.1 业务逻辑:Session 协程
session协程负责处理单一的客户端连接。在协程的作用下,原本复杂的异步状态机被拉平为直观的while(true)同步循环流。我们使用std::expected的返回值配合co_await,在局部范围内完成了非阻塞的 I/O 与异常处理。#include <cstdlib> #include <iostream> #include <spdlog/spdlog.h> #include "co_spawn.h" #include "io_context.h" #include "ip/tcp.h" #include "task.h" #include "timeout.h" #include "buffer.h" auto session(ip::tcp::socket<IOContext> client) -> Task<> { auto data = std::string(1024, '\0'); while (true) { // 1. 异步读取,协程挂起,零线程阻塞 // 如果需要的话,你也可以套一个timeout。不过不要忘了除了timedout错误 auto read_result = co_await client.async_read_some(buffer(data)); if (!read_result) { spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } auto bytes_read = *read_result; if (bytes_read == 0) { spdlog::info("Client {} disconnected", client.native_handle()); co_return; } spdlog::info("Data from client {}: {}", client.native_handle(), data); auto write_buffer = buffer(data.substr(0, bytes_read)); // 2. 利用 std::span 提取有效数据视图,异步写回 using namespace std::literals::chrono_literals; auto write_result = co_await timeout(client.async_write_some(write_buffer), 5s); if (!write_result) { if (write_result.error() == std::errc::timed_out) spdlog::warn("Write to client {} timed out", client.native_handle()); else spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message()); co_return; } } }5.2 监听派发:Echo 协程与主循环
echo协程使用Acceptor监听本地端口。当内核将新连接投递到io_uring的完成队列(CQE)时,echo协程被唤醒,并通过co_spawn将接收到的强类型 Socket 所有权移交给新的session协程。auto echo(IOContext& context) -> Task<> { // auto endpoint = ip::tcp::endpoint::from_string("127.0.0.1", 12345); auto endpoint = ip::tcp::endpoint{ ip::AddressV6::loopback(), 12345 }; std::cout << "Server listening on " << endpoint << "\n"; auto acceptor = ip::tcp::acceptor{ context, endpoint }; auto client_endpoint = ip::tcp::endpoint{}; while (true) { auto client = co_await acceptor.async_accept(client_endpoint); if (!client) { spdlog::warn("Failed to accept client connection: {}", client.error().message()); continue; } spdlog::info("Accepted connection from {}:{}", client_endpoint.address().to_string(), client_endpoint.port()); co_spawn(context, session(std::move(*client))); } } int main(int argc, char* argv[]) { IOContext context; // 启动监听协程 co_spawn(context, echo(context)); // 启动 io_uring 事件循环 context.run(); return EXIT_SUCCESS; }运行结果
[2026-04-23 15:49:48.030] [info] Accepted connection from ::1:37696 [2026-04-23 15:49:49.471] [warning] Data from client 7: dsaf [2026-04-23 15:49:51.885] [warning] Data from client 7: dasaasdd [2026-04-23 15:49:54.124] [info] Accepted connection from ::1:48684 [2026-04-23 15:49:55.462] [warning] Data from client 8: fasasdasdfas [2026-04-23 15:49:55.987] [info] Client 8 disconnected [2026-04-23 15:49:57.515] [info] Client 7 disconnected ^C[2026-04-23 15:49:59.360] [info] Received shutdown signal, stopping IOContext...结语
我们利用 C++20 的协程机制彻底抹平了异步 I/O 的认知鸿沟;利用
io_uring将系统调用的开销降至极限;更重要的是,利用现代 C++ 的类型萃取与所有权语义(RAII & Move Semantics),我们将资源泄漏、类型混用等致命的系统级并发问题,统统拦截在了编译期。别的类型的socket我们不做介绍,和stream socket大体上都是相同的。后续会回归主线,介绍writev以及uring buffer环形队列的协程接口封装。
- 多网卡流量隔离: 客户端在
-
【 基于 io_uring 的 C++20 协程网络库】06 socket层次化封装在构建了底层的异步轮询引擎(
IOContext)、协程机制以及端点(Endpoint)的内存布局后,我们进入网络编程的核心实体:套接字(Socket)。在早期的概念验证阶段(第四篇博客中),为了快速验证系统可行性,我们曾实现过一个简陋的
Socket类,将描述符的创建、bind、listen、accept、read和write糅合在一个结构中。作为原型,它是合格的。但作为工业级的基础设施,这种设计存在致命的类型安全隐患:如果一个用于 UDP 的数据报套接字暴露了
listen()和accept()接口,编译器并不会报错,逻辑谬误只会在运行时以-EOPNOTSUPP(Operation not supported)的形式暴露。本篇的目标是从宏观架构出发,构建一个职责单一、零运行时开销,且受 C++20 强类型系统严格保护的套接字层级体系。
1. 宏观架构
POSIX 系统为网络通信提供了极度灵活但也极其松散的 C API。在现代 C++ 中,核心接口设计原则是:让接口易于正确使用,难以被误用。为此,我们必须根据网络协议的物理行为特征,将 Socket 拆解为层次分明的类簇:
-
BaseSocket:
所有套接字的基类。其唯一职责是:基于 RAII 原则管理文件描述符(fd)的生命周期,并提供底层统一的套接字选项(Socket Options)配置接口。 -
Acceptor(被动接收器):
继承自BaseSocket。专门用于服务端监听。仅开放bind、listen和accept接口。它剥离了数据读写能力,因为监听套接字本身不应参与数据载荷的收发。 -
StreamSocket(流式套接字):
继承自BaseSocket。代表面向连接、可靠的字节流通信(如ip::tcp或本机流式 IPClocal::stream_protocol)。开放connect、read_some和write_some接口。 -
DatagramSocket(数据报套接字):
继承自BaseSocket。代表无连接、不可靠的数据报通信(如ip::udp)。无connect语义,仅开放send_to和receive_from接口。
通过这一分层,若业务代码试图在 UDP Socket 上调用
accept,编译器将在编译阶段直接抛出“找不到该成员函数”的错误。我们将潜在的运行时崩溃彻底转化为编译期约束。同时,该架构也为未来扩充不同的协议栈保留了正交性。2. 契约先行:Protocol 的 Concept 约束
在泛型编程中,必须确保传入的模板参数是合法的协议类型。根据上文对 TCP 的设计,协议需要提供调用
::socket()系统调用所需的三要素:domain、type和protocol。我们使用 C++20 的 Concept 来定义这一显式契约:
#include <concepts> // src/socket.h template <typename T> concept has_domain = requires (const T& t) { { t.domain() } -> std::convertible_to<int>; }; template <typename T> concept has_type = requires (const T& t) { { t.type() } -> std::convertible_to<int>; }; template <typename T> concept has_protocol = requires (const T& t) { { t.protocol() } -> std::convertible_to<int>; }; template <typename T> concept socket_protocol = has_domain<T> && has_type<T> && has_protocol<T>;引入
socket_protocol约束后,任何不满足规范的自定义协议类型在实例化 Socket 时,编译器都会提供精确的诊断信息。3. 第一步:RAII 资源基类 BaseSocket
接下来构建套接字的基类
BaseSocket。其核心是实现严格的所有权(Ownership)和移动语义。在系统级编程中,文件描述符是独占资源。尽管可以通过
dup复制文件描述符,但在基础套接字封装中,拷贝往往意味着所有权语义的混乱。因此,我们明确拒绝拷贝语义。#include <system_error> #include <utility> #include <unistd.h> #include "exceptions.h" template<socket_protocol Protocol, typename Context> class BaseSocket { public: using context_type = Context; using protocol_type = Protocol; // 1. 基于协议类型创建底层 fd BaseSocket(Context& context, const Protocol& protocol) : context_{ &context }, fd_{ create(protocol) } {} // 2. 彻底禁用拷贝语义 BaseSocket(const BaseSocket&) = delete; auto operator=(const BaseSocket&) -> BaseSocket& = delete; // 3. 完美的移动语义:交接控制权,并将源对象的 fd 置为无效 BaseSocket(BaseSocket&& other) noexcept : context_{ std::exchange(other.context_, nullptr) }, fd_{ std::exchange(other.fd_, INVALID_SOCKET) } {} auto operator=(BaseSocket&& other) noexcept -> BaseSocket& { if (this == &other) return *this; close(); // 覆盖前必须先关闭自己现有的 fd context_ = std::exchange(other.context_, nullptr); fd_ = std::exchange(other.fd_, INVALID_SOCKET); return *this; } // 4. RAII 析构 virtual ~BaseSocket() { close(); } [[nodiscard]] constexpr auto is_valid() const noexcept -> bool { return fd_ != INVALID_SOCKET; } auto close() noexcept -> std::expected<void, std::error_code> { if (is_valid()) { auto res = ::close(fd_); fd_ = INVALID_SOCKET; if (res == -1) return unexpected_system_error(); } return {}; } [[nodiscard]] constexpr auto native_handle() const noexcept -> int { return fd_; } [[nodiscard]] auto context() noexcept -> Context& { return *context_; } protected: // 供 Acceptor 接收新连接后直接接管已存在的 fd BaseSocket(Context& context, int fd) : context_{ &context }, fd_{ fd } {} private: static constexpr int INVALID_SOCKET = -1; Context* context_; int fd_ = INVALID_SOCKET; static auto create(const Protocol& protocol) -> int { auto res = ::socket(protocol.domain(), protocol.type(), protocol.protocol()); if (res == -1) throw_system_error("Failed to create socket"); return res; } };这里模板参数的声明顺序(
Protocol,Context)具有其实际意义。通过让Context作为第二个模板参数,我们可以在提供Protocol的前提下,利用 C++17 的类模板参数推导(CTAD)简化客户端代码:// 固定协议别名 template<typename Context> using socket = stream_socket<tcp, Context>; IOContext context{}; // 编译器自动推导 Context 为 IOContext,无需显式指定模板参数 auto s = socket{ context };4. 第二步:构建面向连接的 StreamSocket
确立了资源管理基线后,我们可以派生出
StreamSocket。流式套接字的核心特征在于点对点连接(提供
connect)及无边界的字节流传输(提供read_some和write_some)。这里我们摒弃了传统的void* buffer配合size_t length,全面使用 C++20 的std::span<std::byte>。它携带连续内存的边界信息,能在编译期和运行期最大限度地避免内存越界。#ifndef BLOG_IP_STREAM_SOCKET_H #define BLOG_IP_STREAM_SOCKET_H #include <span> #include <expected> #include "socket.h" #include "operations.h" #include "readsome_awaiter.h" // 协程 Awaiter #include "writesome_awaiter.h" namespace ip { template<typename Protocol, typename Context> class StreamSocket: public BaseSocket<Protocol, Context> { public: using base_socket_type = BaseSocket<Protocol, Context>; using endpoint_type = typename Protocol::endpoint; explicit StreamSocket(Context& context) : base_socket_type{ context, Protocol{} } {} // 接收内核已完成的连接 (用于 Acceptor 生成) StreamSocket(Context& context, int fd) : base_socket_type{ context, fd } {} StreamSocket(StreamSocket&&) = default; StreamSocket& operator=(StreamSocket&&) = default; ~StreamSocket() = default; // --- 同步阻塞接口 --- void connect(const endpoint_type& peer) { // 配合 Endpoint 的 data() 和 size() 语义 operations::connect(this->native_handle(), peer.data(), peer.size()); } auto read_some(std::span<std::byte> buffer) noexcept -> std::expected<std::size_t, std::error_code> { return operations::read_some(this->native_handle(), buffer); } auto write_some(std::span<const std::byte> buffer) noexcept -> std::expected<std::size_t, std::error_code> { return operations::write_some(this->native_handle(), buffer); } // --- 异步协程接口 (对接 io_uring) --- auto async_read_some(std::span<std::byte> buffer) noexcept -> ReadSomeAwaiter<Context> { return ReadSomeAwaiter<Context>{ this->context(), this->native_handle(), buffer }; } auto async_write_some(std::span<const std::byte> buffer) noexcept -> WriteSomeAwaiter<Context> { return WriteSomeAwaiter<Context>{ this->context(), this->native_handle(), buffer }; } }; } // namespace ip #endif // BLOG_IP_STREAM_SOCKET_H通过继承,
StreamSocket天然获取了生命周期管理能力,仅需专注具体的 I/O 逻辑。值得探讨的是,为何
StreamSocket位于namespace ip中?在纯抽象层面,流式套接字似乎应是一个全局泛型概念:只要能读写字节流,即为 Stream Socket。然而,底层协议之间天然存在物理特征的不兼容。例如,IP 协议栈的流式套接字拥有专属于 IP 层的控制选项(如控制 Nagle 算法的
TCP_NODELAY)。如果将其强制抽象为全局通用的
StreamSocket,会导致这些特有配置选项失去编译期类型保护,进而增加运行时决议的复杂度和错误风险。因此,利用namespace ip进行物理与语义双重隔离,是维持零开销抽象的必要设计:namespace ip { template<typename Protocol, typename Context> class StreamSocket: public BaseSocket<Protocol, Context> { public: // ... // IP 协议簇专属的类型安全选项 using keep_alive_idle = ValueOption<IPPROTO_TCP, TCP_KEEPIDLE>; using keep_alive_interval = ValueOption<IPPROTO_TCP, TCP_KEEPINTVL>; using keep_alive_count = ValueOption<IPPROTO_TCP, TCP_KEEPCNT>; using no_delay = BooleanOption<IPPROTO_TCP, TCP_NODELAY>; // ... }; }缓冲区适配:优雅的调用体验 (Ergonomics)
与此同时,我们必须正视一个工程体验问题:虽然将底层的 I/O 接口固化为
std::span<std::byte>确保了绝对的内存边界安全,但这会给上层业务代码带来不适。我们显然不能强制用户在每次读写时,都笨拙地手动强转指针,或者仅仅为了网络传输而将所有业务层容器都声明为std::array<std::byte, N>。为了优化调用侧的体验,同时不向底层 I/O 方法中引入任何多余的模板复杂度,我们利用 C++20 的 Concept,提供一个轻量级的视图转换工厂函数
buffer()。对于只读的连续内存容器,我们将其零开销转换为
std::span<const std::byte>,用于write_some:template<std::ranges::contiguous_range T> auto buffer(const T& range) noexcept -> std::span<const std::byte> { return std::as_bytes(std::span{ range }); }这样一来,用户可以顺畅地写出如下代码,而不必亲自干预指针转换:
std::string msg = "Hello, io_uring!"; // 自动推导并转换,安全且零拷贝 client_sock.write_some(buffer(msg));对于可写的连续内存容器,我们将其转换为
std::span<std::byte>,用于read_some这样的输出调用:template<std::ranges::contiguous_range T> requires (!std::is_const_v<std::remove_reference_t<std::ranges::range_reference_t<T>>>) auto buffer(T& range) noexcept -> std::span<std::byte> { return std::as_writable_bytes(std::span{ range }); }同样地,读取操作也变得极其自然:
std::vector<char> recv_buf(1024); // 安全地提取底层连续内存块供内核填充 client_sock.read_some(buffer(recv_buf));这一层薄薄的抽象,大大优化了调用体验,且没有在核心的 I/O 方法中引入多余的模板膨胀。
5. 协议的装配:Type Traits 工厂
如何将泛型的
StreamSocket与具体的协议(如 TCP)优雅绑定?在我们的设计中,
Protocol类(例如ip::tcp)不仅提供静态常量,还充当类型特征(Type Traits)的装配枢纽。将上述组件装配进去:
namespace ip { class tcp { public: // 强制约束 tcp 的 socket 实现类型 template<typename Context> using socket = StreamSocket<tcp, Context>; // ... 其他静态特征 ... }; } // namespace ip结语:零开销与强类型的交响曲
至此,让我们审视业务代码的最终形态:
ip::tcp::socket<IOContext> client{ context }; ip::tcp::endpoint target = ip::tcp::endpoint::from_string("127.0.0.1", 8080); client.connect(target); std::vector<char> recv_buf(1024); client.read_some(buffer(recv_buf))在这寥寥数行代码中,类型系统在幕后默默完成了以下推导与约束:
ip::tcp::socket自动推导为StreamSocket<ip::tcp, IOContext>。- 编译器确保
connect仅接受ip::tcp::endpoint,如果误传udp::endpoint会导致编译立刻失败。 StreamSocket的基类构造函数静态提取ip::tcp的SOCK_STREAM和IPPROTO_TCP以发起安全的系统调用。- 对象离开作用域时,
BaseSocket的 RAII 机制确保文件描述符被安全释放。
我们彻底隐藏了底层的状态机转移与裸露的 C API,以层次分明、类型严格的现代 C++ 接口取而代之。由于高度模板化和内联优化,这一系列抽象的运行期开销,严格等价于手写裸 C 语言代码。
在下一篇文章中,我们将补齐拼图的最后一块:
Acceptor(被动接收器)的封装。届时,便可利用这套基础设施,跑通完整的基于io_uring的全异步协程服务器。 -
-
【 基于 io_uring 的 C++20 协程网络库】05 Protocol与Endpoint的封装在构建了底层的异步 I/O 引擎(
IOContext)与核心的 Awaiter 机制后,我们的网络库已经具备了处理并发事件的能力。但要让它真正与网络世界通信,我们必须跨越网络编程的第一道门槛:地址与端点(Endpoint)的封装。在原生的 POSIX C API 中,网络地址的表示极其繁琐。开发者需要手动处理
sockaddr_in(IPv4)、sockaddr_in6(IPv6)甚至sockaddr_un(Unix Domain Socket),并充斥着各种宏与危险的指针强制转换。本章的目标是:从零开始,一步步利用 C++20 的语言特性,构建出一套强类型、内存安全且零运行时开销的 Endpoint 体系。(也可以看做对asio的cosplay)
我们最终期望的业务层 API 形态,应该是极致简洁的:
// 期望的现代 C++ 用法:自动推导,透明解析 auto ep1 = ip::tcp::endpoint::from_string("127.0.0.1", 12345); auto ep2 = ip::tcp::endpoint::from_string("::1", 12345);要实现这一目标,我们需要拆解三个核心概念:协议(Protocol)、IP 地址(Address)与端点(Endpoint)。
1. 协议抽象:静态特征与运行期状态的权衡
任何网络通信都需要指定协议。在 C++ 中,为了追求性能,我们通常倾向于将一切可能的信息在编译期固化。对于
Protocol,第一时间,我们可能会设计出一个如下的纯模板类:// 纯静态协议封装(存在局限性) template<int Domain, int Type, int Protocol> struct BasicProtocol { static constexpr int domain = Domain; static constexpr int type = Type; static constexpr int protocol = Protocol; };对于
Type(如SOCK_STREAM代表 TCP)和Protocol(如IPPROTO_TCP),它们确实是静态不变的。然而,Domain(地址族,即AF_INET或AF_INET6)在现代网络编程中,并非总能在编译期绝对固化。考虑一个支持 双栈(Dual-Stack) 的服务器:当你创建一个监听
::的 Acceptor 时,它在运行期需要同时处理 IPv4 和 IPv6 的接入。如果Domain被彻底写死在模板参数中,我们将无法用单一的泛型类型来描述这个 Acceptor。因此,正确的设计哲学是:固化协议的本原特征,保留地址族的运行期决议能力。
以下是我们对
ip::tcp的完整实现:namespace ip { class tcp { public: // 1. 本原特征:使用 consteval 强制在编译期求值,拒绝任何运行时状态的介入 [[nodiscard]] consteval auto type() const noexcept -> int { return SOCK_STREAM; } [[nodiscard]] consteval auto protocol() const noexcept -> int { return IPPROTO_TCP; } // 2. 运行期特征:使用 constexpr,允许在运行时根据双栈需求进行动态切换 [[nodiscard]] constexpr auto domain() const noexcept -> int { return domain_; } // 具名构造器,语义清晰 static auto v4() noexcept -> tcp { return tcp{ AF_INET }; } static auto v6() noexcept -> tcp { return tcp{ AF_INET6 }; } private: int domain_ = AF_INET; explicit tcp(int domain) : domain_{ domain } {} }; } // namespace ip通过将
type()和protocol()声明为consteval,我们在编译器层面建立了一条严格的契约,这在后续作为 Type Traits 提取协议参数时,提供了与静态常量完全一致的零开销保证。2. IP 地址封装
端点是由 IP 地址和端口组成的。在封装端点之前,我们需要先解决繁琐的 IP 地址解析。
在 POSIX 中,IPv4 被存储为 4 字节的
in_addr,IPv6 被存储为 16 字节的in6_addr。我们首先利用 RAII 将它们分别封装,并隐藏丑陋的
inet_pton(字符串转网络字节序)系统调用。以
AddressV4为例:struct AddressV4 { using address_type = in_addr; address_type address{}; // 字符串解析工厂 static auto from_string(std::string_view address) -> AddressV4 { AddressV4 result; if (::inet_pton(AF_INET, address.data(), &result.address) != 1) throw_system_error("Failed to convert string to IPv4 address"); return result; } // 格式化输出 [[nodiscard]] auto to_string() const -> std::string { std::string buffer(INET_ADDRSTRLEN, '\0'); if (::inet_ntop(AF_INET, &address, buffer.data(), INET_ADDRSTRLEN) == nullptr) throw_system_error("Failed to convert IPv4 address to string"); return buffer; } }; // AddressV6 的实现高度对称,此处省略统一抽象:构建 Address 类
在业务代码中,我们不希望用户去手动
if/else判断当前字符串是 V4 还是 V6。我们需要一个统一的Address类来容纳它们。此时,
std::variant成为了最完美的工具。我们可以给出如下实现:class Address { public: using address_type = std::variant<AddressV4, AddressV6>; Address() = default; Address(const AddressV4& ipv4) : address_{ ipv4 } {} Address(const AddressV6& ipv6) : address_{ ipv6 } {} [[nodiscard]] constexpr auto is_v4() const noexcept -> bool { return std::holds_alternative<AddressV4>(address_); } [[nodiscard]] auto to_string() const -> std::string { // 优雅的多态调用 return std::visit([](const auto& addr) { return addr.to_string(); }, address_); } static auto from_string(std::string_view address) -> Address { AddressV4 ipv4{}; if (::inet_pton(AF_INET, address.data(), &ipv4.address) == 1) return { ipv4 }; AddressV6 ipv6{}; if (::inet_pton(AF_INET6, address.data(), &ipv6.address) == 1) return { ipv6 }; throw_system_error("Invalid IP address format"); std::unreachable(); } private: address_type address_; };通过
from_string,我们实现了一个健壮的解析器:它会依次尝试按 IPv4 和 IPv6 解析字符串,并将成功的结果打包进安全的variant容器中。3. Endpoint 封装:跨越 ABI 边界的内存博弈
现在,我们来到了最核心的部件
BasicEndpoint。它需要将前面实现的Protocol、Address与端口(Port)结合起来,并最终生成底层的sockaddr结构供内核使用。3.1 为什么引入 Protocol 模板参数?
你可能会疑惑:既然 IP 层的底层表示都是
sockaddr,为什么我们要设计成模板类BasicEndpoint<Protocol>,而不是一个通用的Endpoint类?这是出于 强类型安全 的考量。
TCP 的127.0.0.1:80和 UDP 的127.0.0.1:80在底层字节上完全一致,但在物理逻辑上是截然不同的通道。如果它们是同一个类型,开发者极易将 UDP 的端点传给 TCP 的 Socket 进行connect,这种谬误只能在运行时由内核抛出异常。
通过Protocol模板,endpoint<tcp>和endpoint<udp>在 C++ 编译器眼中变成了绝对正交的两种类型,任何混用都会在编译期被拦截,这是零开销抽象的典范。3.2 致命陷阱:为何 Endpoint 内部必须摒弃 std::variant?
在封装
Address时,我们使用了std::variant。但在Endpoint内部存储底层结构时,却不能继续使用std::variant<sockaddr_in, sockaddr_in6>了,Endpoint的内存不仅用于读取,更要以裸指针(sockaddr*)的形式交给内核 API(如accept或recvfrom)。这些 API 具有 Overwrite 语义:内核会直接根据实际接收到的连接,向这块内存灌入 IPv4 或 IPv6 的字节流。如果底层是
std::variant:- 内存布局破坏:
variant内部存在一个用于记录当前类型的index标记(以及可能的对齐 Padding)。内核如果从首地址开始写sa_family,会直接破坏这个标记。 - 状态脱节(UB):假设
variant当前为 IPv4(16字节),内核写入了 IPv6(28字节)的数据。内核无从知晓 C++ 的机制,绝不会去更新variant的index。当 C++ 代码再次读取时,将发生严重的未定义行为(Undefined Behavior)。
3.3 解决方案:Union 与 sockaddr_storage
为了在确保 C 兼容性的同时提供 C++ 视图,最标准的解决方案是:使用
union配合sockaddr_storage。 也借此机会,强调一下C++的底层哲学:程序的世界没有银弹。对于不同场景选择适合的方式,所以C++提供了大量特性。template<typename Protocol> class BasicEndpoint { public: using protocol_type = Protocol; using address_type = Address; // ... 构造函数见下文 ... // 提供给内核 API 的多态强转接口 auto data() noexcept -> sockaddr* { return reinterpret_cast<sockaddr*>(&data_.storage); } private: // 经典的多态内存视图 (Aliased Views) union AddressType { sockaddr_storage storage; // 128字节,最严格对齐,提供绝对安全的物理容量兜底 sockaddr_in v4; // 提供给 C++ 侧的 IPv4 具象化读写视图 sockaddr_in6 v6; // 提供给 C++ 侧的 IPv6 具象化读写视图 } data_; };这里巧妙利用了 C/C++ 语言规范中的公共初始序列机制:所有
sockaddr家族结构体的头两个字节都是sa_family_t。因此,即便内核粗暴地覆盖了
storage,我们依然可以通过data_.storage.ss_family安全且合法地获知当前内存中实际装载的协议。4. 严守边界:Size 与 Capacity 的严格隔离
底层封装中最容易触发“缓冲区截断”Bug 的,是如何向内核报告这块
union的长度。我们必须显式隔离size()和capacity()的语义:// 专供 输出型 API (如 accept, recvfrom) 使用 [[nodiscard]] constexpr auto capacity() const noexcept -> socklen_t { return sizeof(sockaddr_storage); // 永远返回最大容量 128 字节 } // 专供 输入型 API (如 bind, connect) 使用 [[nodiscard]] constexpr auto size() const noexcept -> socklen_t { if (data_.storage.ss_family == AF_INET) return sizeof(sockaddr_in); // 16 字节 return sizeof(sockaddr_in6); // 28 字节 }- Input 操作(
bind/connect):内核要求精确匹配。如果你调用bind时传入了 128 字节(capacity),内核会因为长度不符合 IPv4(16)或 IPv6(28)的规约而直接返回-EINVAL。必须严格使用动态计算的size()。 - Output 操作(
accept):内核要求提供最大安全缓冲。在双栈监听模式下,随时可能接入 28 字节的 IPv6 客户端。如果此时你传入的是size()(若端点默认初始化为 v4,则为 16),内核会直接截断写入,导致提取到的客户端地址完全损坏。必须严格使用capacity()。
5. 拼图闭环:优雅的构造过程
有了上述坚实的底层基础,我们可以将
Address对象转换为底层的union表示,最终兑现文章开头“一键构造”的承诺:BasicEndpoint(const address_type& address, in_port_t port) { std::memset(&data_, 0, sizeof(data_)); // 彻底清空,防止残留垃圾数据 if (address.is_v4()) { data_.storage.ss_family = AF_INET; data_.v4.sin_family = AF_INET; data_.v4.sin_port = ::htons(port); data_.v4.sin_addr = address.to_v4().address; } else { data_.storage.ss_family = AF_INET6; data_.v6.sin6_family = AF_INET6; data_.v6.sin6_port = ::htons(port); data_.v6.sin6_addr = address.to_v6().address; } } static auto from_string(std::string_view address, in_port_t port) -> BasicEndpoint { // 委托给 Address 的 variant 解析引擎,解析完成后交由本类的构造函数填充物理内存 return BasicEndpoint{ address_type::from_string(address), port }; }至此,我们的
Endpoint彻底打通了从上层字符串抽象到底层 C 语言裸内存的通路。它对外提供了严格的类型契约,对内完美化解了 ABI 边界的内存博弈。对了,不要忘了,在tcp中,导出我们的endpoint
class tcp { public: using address = Address; using endpoint = BasicEndpoint<tcp>; }; - 内存布局破坏:
-
【 基于 io_uring 的 C++20 协程网络库】04 核心IO操作的协程实现终于,我们的基础设施已经足以支撑起真正的网络交互,是时候着手实现
async_read、async_write以及async_accept等核心操作了。在本文中,我们会以一个相对简练的
Socket封装作为承载这些操作的起点。这显然不是它的最终形态(我们的最终目标是实现类似 Boost.Asio 的现代化 API 结构),但如果一上来就陷入对底层套接字选项、地址解析等细节的繁琐封装中,将严重偏离探讨协程并发模型的主线。因此,第一版的Socket仅是一个将底层系统调用简单捏合的产物。本文的焦点将完全聚集于底层的协程流转机制,以及我们如何通过确立公开的 Concept 契约,赋予网络库极其强大的组合与扩展能力。
1. 过渡期的 Socket 封装
第一版的
Socket核心职责是基于 RAII 管理文件描述符的生命周期,并利用 C++ 模板将底层的setsockopt等操作强类型化,消灭裸露的void*强转。在错误处理策略上,我们确立了一个清晰的工程边界:
- 初始化与配置阶段(如
socket创建、bind、listen、setsockopt):这些操作如果失败,通常意味着系统资源耗尽或配置存在致命错误,因此直接抛出异常(Throw Exception)。 - 网络 I/O 交互阶段(如
read、write、accept):在分布式系统中,超时、对端重置连接等属于常规的运行时分支,不应引发代价高昂的栈展开(Stack Unwinding)。因此,这部分操作必须返回std::expected<T, std::error_code>。
基础代码如下:
#include <sys/socket.h> #include <fcntl.h> #include <unistd.h> #include <utility> #include <span> template<typename Context> class Socket { public: using context_type = Context; using reuse_address = BooleanOption<SOL_SOCKET, SO_REUSEADDR>; #ifdef SO_REUSEPORT using reuse_port = BooleanOption<SOL_SOCKET, SO_REUSEPORT>; #endif using error = BooleanOption<SOL_SOCKET, SO_ERROR>; using receive_buffer_size = ValueOption<SOL_SOCKET, SO_RCVBUF>; using send_buffer_size = ValueOption<SOL_SOCKET, SO_SNDBUF>; using non_blocking = FlagOption<F_GETFL, F_SETFL, O_NONBLOCK>; using close_on_exec = FlagOption<F_GETFD, F_SETFD, FD_CLOEXEC>; Socket(Context& context, int domain, int type, int protocol) : context_{ context } { fd_ = ::socket(domain, type, protocol); if (fd_ == -1) throw_system_error("Failed to create socket"); } Socket(Context& context, int fd) : context_{ context }, fd_(fd) {} Socket(const Socket&) = delete; auto operator=(const Socket&) -> Socket& = delete; Socket(Socket&& other) noexcept : context_{ other.context_ }, fd_{ std::exchange(other.fd_, -1) } {} auto operator=(Socket&& other) noexcept -> Socket& { if (this != &other) { close(); context_ = other.context_; fd_ = std::exchange(other.fd_, -1); } return *this; } ~Socket() { close(); } template<socket_option Option> void option(const Option& value) { auto res = ::setsockopt(fd_, Option::level, Option::name, value.data(), value.size()); if (res == -1) throw_system_error("Failed to set socket option[{}]", Option::name); } template<socket_option Option> auto option() const -> Option { Option option{}; auto size = static_cast<socklen_t>(option.size()); auto res = ::getsockopt(fd_, Option::level, Option::name, option.data(), &size); if (res == -1) throw_system_error("Failed to get socket option[{}]", Option::name); if (size != option.size()) throw std::runtime_error{ "Unexpected socket option size" }; return option; } template<flag_option Option> void option(const Option& value) { auto current_flags = ::fcntl(fd_, Option::get_cmd); if (current_flags == -1) throw_system_error("Failed to get socket flags"); auto new_flags = value ? (current_flags | Option::bit) : (current_flags & ~Option::bit); if (::fcntl(fd_, Option::set_cmd, new_flags) == -1) throw_system_error("Failed to set socket flags"); } template<flag_option Option> auto option() const -> Option { auto current_flags = ::fcntl(fd_, Option::get_cmd); if (current_flags == -1) throw_system_error("Failed to get socket flags"); if (current_flags & Option::bit) return Option{ true }; return Option{ false }; } void close() { if (fd_ != -1) { ::close(fd_); fd_ = -1; } } constexpr auto native_handle() const -> int { return fd_; } void bind(const sockaddr* addr, socklen_t addrlen) { if (::bind(fd_, addr, addrlen) == -1) throw_system_error("Failed to bind socket"); } void listen(int backlog) { if (::listen(fd_, backlog) == -1) throw_system_error("Failed to listen on socket"); } // 前置声明的协程 I/O 操作接口 auto async_accept() -> AcceptAwaiter<Socket<Context>>; auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context>; auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context>; private: Context& context_; int fd_; };2. 核心 Awaiter 剖析
在实现底层 Awaiter 时,我们需要遵循上一节确立的
single_shot_only_operation概念约束,确保它们能够与TimeoutAwaiter无缝协作。2.1 ReadSomeAwaiter
在传统的 C 语言接口中,
read通常使用void* buffer配合size_t len。而在现代 C++ 中,我们应当使用视图类型std::span<std::byte>。这不仅明确了字节流的意图,更在编译期消除了指针越界的风险。更重要的是,由于协程挂起期间,Awaiter 对象驻留在独立的协程帧中,外部传入的
buffer视图会在整个异步操作期间保持有效。这从语言特性层面保证了内存安全,使得我们无需引入额外的引用计数机制。#ifndef BLOG_READSOME_AWAITER_H #define BLOG_READSOME_AWAITER_H #include <coroutine> #include <cstddef> #include <expected> #include <span> #include <system_error> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Context> class ReadSomeAwaiter : public Operation { public: using resume_type = std::size_t; ReadSomeAwaiter(Context& context, int fd, std::span<std::byte> buffer) : context_{ context }, fd_{ fd }, buffer_{ buffer } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return byte_read_; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_recv(sqe, fd_, buffer_.data(), buffer_.size(), 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) byte_read_ = static_cast<std::size_t>(result); else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> Context& { return context_; } private: Context& context_; int fd_; std::span<std::byte> buffer_; std::coroutine_handle<> handle_{ nullptr }; std::size_t byte_read_{ 0 }; int error_code_{ 0 }; }; #endif // BLOG_READSOME_AWAITER_H在
Socket中,我们只需将其作为返回值工厂抛出:auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context> { return ReadSomeAwaiter<Context>{ context_, fd_, buffer }; }2.2 WriteSomeAwaiter
写操作
WriteSomeAwaiter与读操作高度对称。唯一的区别是它接受只读视图std::span<const std::byte>,并将其底层操作码映射至io_uring_prep_send。#ifndef BLOG_WRITESOME_AWAITER_H #define BLOG_WRITESOME_AWAITER_H #include <coroutine> #include <cstddef> #include <expected> #include <span> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Context> class WriteSomeAwaiter : public Operation { public: using resume_type = std::size_t; WriteSomeAwaiter(Context& context, int fd, std::span<const std::byte> buffer) : context_{ context }, fd_{ fd }, buffer_{ buffer } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return byte_written_; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_send(sqe, fd_, buffer_.data(), buffer_.size(), 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) byte_written_ = static_cast<std::size_t>(result); else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> Context& { return context_; } private: std::coroutine_handle<> handle_{ nullptr }; Context& context_; int fd_; std::span<const std::byte> buffer_; std::size_t byte_written_{ 0 }; int error_code_{ 0 }; }; #endif // BLOG_WRITESOME_AWAITER_HSocket中的接口实现:auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context> { return WriteSomeAwaiter<Context>{ context_, fd_, buffer }; }2.3 AcceptAwaiter
AcceptAwaiter负责处理被动连接接入。在原生 C 接口中,accept可以通过传入sockaddr*来获取对端地址信息。由于当前版本的实现并未对地址(Endpoint)进行体系化封装,该版本的AcceptAwaiter暂不支持提取连接地址,而是直接移交内核产生的新fd,由其自动推导包装为新的Socket实例返回。#ifndef BLOG_ACCEPT_AWAITER_H #define BLOG_ACCEPT_AWAITER_H #include <coroutine> #include <expected> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Socket> class AcceptAwaiter : public Operation { public: using context_type = typename Socket::context_type; using socket_type = Socket; using resume_type = socket_type; AcceptAwaiter(context_type& context, int fd) : context_{ context }, fd_{ fd } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return resume_type{ context_, result_fd_ }; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_accept(sqe, fd_, nullptr, nullptr, 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) result_fd_ = result; else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> context_type& { return context_; } private: context_type& context_; int fd_; std::coroutine_handle<> handle_{ nullptr }; int result_fd_{ -1 }; int error_code_{ 0 }; }; #endif // BLOG_ACCEPT_AWAITER_H在
Socket中,挂载实现如下:auto async_accept() -> AcceptAwaiter<Socket<Context>> { return AcceptAwaiter<Socket<Context>>{ context_, fd_ }; }3. Echo Server 实战
有了这三大基础 I/O Awaiter,我们已经可以构建一个极简但具备并发特征的 Echo Server了。
在这个实现中,没有任何的回调嵌套,也没有跨线程的数据同步操作,控制流如同传统阻塞代码一般自上而下铺陈。
auto session(Socket<IOContext> client) -> Task<void> { std::array<std::byte, 1024> buffer{}; while (true) { auto read_result = co_await client.async_readsome(buffer); if (!read_result) { spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } auto bytes_read = *read_result; if (bytes_read == 0) { spdlog::info("Client {} disconnected", client.native_handle()); co_return; } spdlog::info("Read {} bytes from client {}", bytes_read, client.native_handle()); std::string_view data{ reinterpret_cast<const char*>(buffer.data()), bytes_read }; spdlog::warn("Data from client {}: {}", client.native_handle(), data); auto write_buffer = std::span{ buffer }.first(bytes_read); auto write_result = co_await client.async_writesome(write_buffer); if (!write_result) { spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message()); co_return; } if (*write_result != bytes_read) spdlog::warn("Partial write on client {}: {} / {} bytes", client.native_handle(), *write_result, bytes_read); } } auto server(IOContext& context) -> Task<void> { Socket acceptor{ context, AF_INET, SOCK_STREAM, 0 }; acceptor.option(Socket<IOContext>::reuse_address{ true }); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = ::htons(12345); acceptor.bind(reinterpret_cast<sockaddr*>(&addr), sizeof(addr)); acceptor.listen(1024); while (true) { auto client = co_await acceptor.async_accept(); if (!client) { spdlog::warn("Failed to accept connection: {}", client.error().message()); continue; } co_spawn(context, session(std::move(*client))); } } auto shutdown_monitor(IOContext& context) -> Task<void> { using namespace std::chrono_literals; SignalSet sets{ context, signals::interrupt, signals::terminate }; co_await sets.async_wait(); spdlog::info("Received shutdown signal, stopping IOContext..."); context.stop(); } int main(int argc, char* argv[]) { IOContext context{}; co_spawn(context, server(context)); co_spawn(context, shutdown_monitor(context)); context.run(); spdlog::info("IOContext stopped, exiting..."); return EXIT_SUCCESS; }编译执行后,利用多个客户端建立连接,输出日志如下:
blog.socket_v1 [2026-04-20 21:15:51.631] [info] Read 6 bytes from client 7 [2026-04-20 21:15:51.631] [warning] Data from client 7: dsfsa [2026-04-20 21:15:57.312] [info] Read 23 bytes from client 8 [2026-04-20 21:15:57.312] [warning] Data from client 8: fdasfas的撒大法师 [2026-04-20 21:16:01.863] [info] Client 8 disconnected [2026-04-20 21:16:03.021] [info] Client 7 disconnected [2026-04-20 21:16:09.837] [info] Read 36 bytes from client 7 [2026-04-20 21:16:09.837] [warning] Data from client 7: asfasdfsasa打撒四方达dsa去玩 [2026-04-20 21:16:11.133] [info] Client 7 disconnected ^C[2026-04-20 21:16:14.162] [info] Received shutdown signal, stopping IOContext... [2026-04-20 21:16:14.162] [info] IOContext stopped, exiting...更重要的是,得益于对实现中对
single_shot_only_operation的支持,我们可以无需修改底层ReadSomeAwaiter的任何一行代码,直接将上一篇博客中构建的TimeoutAwaiter无缝挂载。针对读写超时的正交组合可以写出如下形式:
// 控制 read 超时 auto read_result = co_await timeout(client.async_readsome(buffer), 5s); if (!read_result) { if (read_result.error() == std::errc::timed_out) { spdlog::debug("Read timed out on client {}", client.native_handle()); continue; } spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } // ... 处理接收逻辑 ... // 控制 write 超时 auto write_result = co_await timeout(client.async_writesome(write_buffer), 5s); if (!write_result) { if (write_result.error() == std::errc::timed_out) { spdlog::debug("Write timed out on client {}", client.native_handle()); continue; } spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message()); co_return; }基础骨架至此已完全跑通并形成闭环。在未来的优化中,我们将在这些底层基石之上,引入更完备的端点(Endpoint)表示与高级协议解析组件,让网络库从“可用”走向“现代化”。
- 初始化与配置阶段(如
-
【 基于 io_uring 的 C++20 协程网络库】03 基于链式请求的零开销超时机制这一步要实现什么,我也很纠结。最好的当然是直奔socket的封装,然后实现async accept,async read,async write这些典型的协程调用。
但是上文中留下一些坑又需要尽快填一下,否则后续再改会导致更大的重构。我们需要在这一篇中解释一下为什么要在
PollAwaiter中开放set_result,prepare,context这些看起来毫无卵用的接口。最主要的目的就是能实现一个timeout接口。
在传统的 Reactor 模型(如 epoll)中,实现超时通常需要在用户态维护一个独立的数据结构(如最小堆或时间轮)来管理定时器,这往往伴随着额外的动态内存分配(分配定时器节点)以及后台线程的唤醒开销。
io_uring提供了更底层的解法:链式请求(Linked Requests)。通过将 I/O 操作与定时器在内核态进行绑定,我们可以将状态同步的复杂度完全下沉至内核,从而实现真正的零动态分配超时机制。
1. 接口约束:single_shot_only_operation
在实现通用的超时包装器之前,我们需要界定“什么类型的操作允许被包装”。
我们定义
single_shot_only_operation,要求目标类型不仅继承自Operation基类,还必须提供用于获取上下文、组装 SQE 以及处理结果的特定接口:#include <concepts> #include <coroutine> #include <expected> #include <system_error> template<typename T> concept single_shot_only_operation = requires (T& op, ::io_uring_sqe* sqe) { typename T::resume_type; requires std::is_lvalue_reference_v<decltype(op.context())>; op.context(); op.prepare(sqe); op.set_result(0, 0); { op.await_resume() } -> std::same_as<std::expected<typename T::resume_type, std::error_code>>; } && std::derived_from<T, Operation>;基于此约束,无论底层的协程等待体是
Socket::async_read还是SignalSet::async_wait,只要满足条件,编译器即可保证其能被安全地赋予超时语义。2. 核心机制:IOSQE_IO_LINK 与内核竞速
io_uring的 SQE 链式调用是我们实现零开销超时的核心。当我们在一个 SQE 的标志位中设置IOSQE_IO_LINK时,内核会将其与紧随其后提交的下一个 SQE 绑定为一个原子链。配合专用的操作码
IORING_OP_LINK_TIMEOUT,内核会执行以下竞速逻辑:- 内核并行处理业务 I/O 请求,并同时启动定时器。
- 如果业务请求先完成,内核自动取消挂载的定时器。
- 如果定时器先到期,内核自动强行取消业务请求,并使其返回
-ECANCELED。
这种将同步状态机交由内核仲裁的设计,使得用户态代码无需介入复杂的取消流程。
3. TimeoutAwaiter 的内存安全陷阱与实现
构建
TimeoutAwaiter时,面临的最大工程挑战是协程帧的生命周期管理。由于我们向内核一次性提交了两个 SQE(业务 I/O 与 Timeout),内核在执行完毕后,必然会返回两个对应的 CQE。如果采用“先到先得”的简单逻辑,在第一个 CQE 到达时立即调用
handle.resume()恢复协程,会导致一个隐蔽且致命的 Use-After-Free (UAF) 漏洞:当协程被恢复后,包含在该协程帧内的
TimeoutAwaiter对象可能会随着当前作用域的结束而立即析构。此时,内核中仍有一个被取消的 CQE 正在返回途中。当IOContext收割这个滞后的 CQE 并尝试调用complete时,其user_data指针已指向被释放的内存,导致进程崩溃。因此,必须在
complete中引入一个无锁的计数器屏障,确保两个 CQE 全部落地后,再将控制权交还给协程。完整的
TimeoutAwaiter实现如下:template<single_shot_only_operation InnerOperation> class TimeoutAwaiter : public Operation { public: using resume_type = typename InnerOperation::resume_type; template<typename Duration> TimeoutAwaiter(InnerOperation&& operation, Duration timeout) noexcept : inner_operation_{ std::forward<InnerOperation>(operation) } { using namespace std::chrono; timeout_.tv_sec = duration_cast<seconds>(timeout).count(); timeout_.tv_nsec = duration_cast<nanoseconds>(timeout % 1s).count(); } constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* io_sqe = context().sqe(); auto* timeout_sqe = context().sqe(); // 组装业务 I/O 并设置链式标志 inner_operation_.prepare(io_sqe); io_sqe->flags |= IOSQE_IO_LINK; ::io_uring_sqe_set_data(io_sqe, this); // 紧跟超时探测请求 ::io_uring_prep_link_timeout(timeout_sqe, &timeout_, 0); ::io_uring_sqe_set_data(timeout_sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (is_timed_out_) return unexpected_system_error(std::errc::timed_out); inner_operation_.set_result(result_, 0); return inner_operation_.await_resume(); } void set_result(int result, std::uint32_t flags) noexcept { if (result == -ETIME) is_timed_out_ = true; else if (result != -ECANCELED) result_ = result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (--pending_cqes_ == 0) { auto handle = std::exchange(handle_, {}); handle.resume(); } } auto context() noexcept -> decltype(std::declval<InnerOperation&>().context()) { return inner_operation_.context(); } private: InnerOperation inner_operation_; struct __kernel_timespec timeout_{}; std::coroutine_handle<> handle_{ nullptr }; int pending_cqes_{ 2 }; // 提交了 2 个 SQE,必然返回 2 个 CQE bool is_timed_out_{ false }; int result_{ -ECANCELED }; }; template<single_shot_only_operation Operation, typename Duration> auto timeout(Operation&& awaitable, Duration t) noexcept -> TimeoutAwaiter<std::decay_t<Operation>> { return TimeoutAwaiter<std::decay_t<Operation>>{ std::forward<Operation>(awaitable), t }; }架构收益:Core-Per-Thread 带来的无锁抽象
细心的读者可能会发现,在处理跨越不同异步回调的生命周期同步时,我们仅仅使用了一个普通的内建整型变量
int pending_cqes_{ 2 };,而没有求助于std::atomic<int>或任何形式的互斥锁。这正是我们选择 Core-Per-Thread(单线程独立上下文) 架构的直接收益。在该模型下,底层
io_uring队列的投递、事件的收割(IOContext::run)、complete回调的触发,以及协程的恢复,全都被严格限制在单一线程的顺序执行流中。这种确定的串行化特征从根本上消除了数据竞争。因此,我们可以毫无顾忌地使用裸整型进行状态流转,彻底免除了原子操作带来的缓存行同步与内存屏障开销,将“零开销抽象”贯彻到了每一个微小的细节中。4. 正交设计:避免 API 表面积爆炸
在上述实现中,
std::expected发挥了重要作用,我们将内核传回的-ETIME翻译为了标准的std::errc::timed_out。但比类型安全更值得关注的,是这种基于泛型与组合语义带来的高层架构美学。在传统的网络库设计中,超时逻辑往往被直接硬编码进具体的 I/O 操作中。这意味着设计者不得不提供诸如
async_read_with_timeout、async_write_with_timeout、async_connect_with_timeout等一系列冗余接口。假设系统存在 $N$ 种基础操作,未来又需要引入 $M$ 种类似于超时的修饰语义,API 的数量就会呈现 $N \times M$ 的指数级膨胀,最终导致表面积爆炸(API Surface Area Explosion)。而我们设计的
TimeoutAwaiter与任何具体的业务操作是严格正交的。通过 C++20 的 Concept 约束,它充当了一个纯粹的通用修饰器,能够无缝叠加在任何满足规范的操作之上,将库的 API 复杂度完美控制在了 $N + M$。结合泛型的工厂函数,上层业务代码可以以极低侵入性的自然语序组合它们:
auto network_read_task(IOContext& context, Socket& socket) -> Task<void> { using namespace std::chrono_literals; // 组合语义:以正交的方式为 async_read 附加 5 秒的超时约束 auto result = co_await timeout(socket.async_read(buffer), 5s); if (!result) { if (result.error() == std::errc::timed_out) spdlog::warn("Read operation timed out."); else spdlog::error("Read failed: {}", result.error().message()); co_return; } spdlog::info("Successfully read {} bytes.", result.value()); }演示
由于现有实现比较简陋,我们只能复用下02章中async_wait来测试下timeout的语义是否正确。实际的代码里,是不太可能组合async_wait和timeout的。
auto shutdown_monitor(IOContext& context) -> Task<void> { using namespace std::chrono_literals; SignalSet sets{ context, signals::interrupt, signals::terminate }; // 唯一改动点,测试一下timeout的行为是否正确 co_await timeout(sets.async_wait(), 5s); spdlog::info("Received shutdown signal, stopping IOContext..."); context.stop(); } auto demo(IOContext& context) -> Task<void> { using namespace std::chrono_literals; spdlog::info("demo started"); // 模拟一些持续的异步工作,直到接收到退出信号 while (true) co_await sleep_for(context, 1s); spdlog::info("demo completed"); } int main(int argc, char* argv[]) { IOContext context{}; co_spawn(context, demo(context)); co_spawn(context, shutdown_monitor(context)); context.run(); spdlog::info("IOContext stopped, exiting..."); return EXIT_SUCCESS; }执行结果
[2026-04-20 02:28:23.356] [info] demo started [2026-04-20 02:28:28.792] [info] Received shutdown signal, stopping IOContext... [2026-04-20 02:28:28.792] [info] IOContext stopped, exiting...可以看到,5s之后,async_wait结束了等待,context.stop()触发,程序结束了。
也可以按下Ctrl+C来触发SIGINT信号
/home/doom/blog/build/demo/blog.timeout_v1 [2026-04-20 10:25:32.885] [info] demo started ^C[2026-04-20 10:25:33.891] [info] Received shutdown signal, stopping IOContext... [2026-04-20 10:25:33.891] [info] IOContext stopped, exiting... -
【 基于 io_uring 的 C++20 协程网络库】02:模块解耦与完备的退出机制在上文中,我们构建了
IOContext的核心事件循环骨架。然而,随着组件的增加,我们需要解决两个实际的工程问题:一是如何对底层上下文进行合理的抽象与解耦;
二是如何优雅、无阻塞地处理外部中断信号并终止事件循环。
1. 模块解耦与泛型化设计
考虑到未来我们可能会迭代出多个版本的
IOContext,为了最大化代码复用,将具体的协程 Awaiter(如SleepAwaiter)与底层的IOContext实现解耦是必要的。首先,我们将
Operation接口提取到独立的头文件中。其次,对于
SleepAwaiter,由于它依赖Context::sqe(),若在头文件中硬编码IOContext,必须要立刻知道IOContext的定义。因此,我们采用模板化设计,将Context泛型化,在调用点推导出确切的类型。template<typename Context> class SleepAwaiter: public Operation { public: template<chrono_duration Duration> SleepAwaiter(Context& context, Duration d) : context_{ context } { using namespace std::chrono; // 转换 std::chrono 时间为内核认识的 timespec timeout_.tv_sec = duration_cast<seconds>(d).count(); timeout_.tv_nsec = duration_cast<nanoseconds>(d % seconds(1)).count(); } [[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); // 提交纯超时指令,count 设为 0 表示只受时间触发 ::io_uring_prep_timeout(sqe, &timeout_, 0, 0); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<void, std::error_code> { // io_uring 中,超时正常结束会返回 ETIME if (error_code_ == ETIME || error_code_ == 0) return {}; // 其他错误(如 ECANCELED 被提前强杀) return unexpected_system_error(error_code_); } void complete(int res, std::uint32_t flags) noexcept override { error_code_ = -res; if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } private: Context& context_; struct __kernel_timespec timeout_{}; std::coroutine_handle<> handle_{ nullptr }; int error_code_{ 0 }; }; template<typename Context, typename Duration> auto sleep_for(Context& context, Duration duration) noexcept -> SleepAwaiter<Context> { return SleepAwaiter<Context>{ context, duration }; }设计注记:
通常情况下,过度泛型化(滥用模板)会劣化编译时长,并不值得推崇。但在基础设施库的设计中,静态多态(基于模板的 Duck Typing)能够做到零运行时开销(Zero-overhead),且调用方代码无需任何修改即可适配不同版本的IOContext,这种妥协是极具工程价值的。2. 完善事件循环的终止机制 (eventfd)
上个版本中,我们使用
std::atomic<bool> should_stop_标志来控制循环退出。但这存在一个死锁隐患:如果IOContext::run()正阻塞在io_uring_submit_and_wait系统调用上,单纯修改布尔变量是无法唤醒内核态线程的。我们需要一种跨越内核与用户态的唤醒机制。在传统的 Reactor 模式中,通常采用管道(pipe)或
eventfd,在io_uring体系下,eventfd依然是开销极小且最适用的方案。核心状态重构:区分系统事件与业务逻辑
在引入
eventfd后,完成队列(CQ)中不仅会包含业务逻辑的事件(如网络 I/O、定时器),还会混入我们内部触发的 wakeup 事件。
这就要求我们必须在状态追踪上做出严格区分:-
count:追踪当前批次取出的所有 CQE 数量,用于向前推进内核的共享环形缓冲区(io_uring_cq_advance)。 -
workdone:追踪实际完成的业务任务数量,仅针对这些任务去扣减outstanding_works_。如果不对二者加以区分,wakeup 信号会导致业务计数器异常递减,引发程序提前退出或触发断言失败。
同时,我们利用 liburing 原生的内联辅助函数
io_uring_cqe_get_data64、io_uring_cqe_get_data以及io_uring_sqe_set_data64来取代底层的直接字段访问,这消除了reinterpret_cast的滥用,确保了类型安全的边界。完整的
IOContext实现如下:#include <sys/eventfd.h> #include <unistd.h> #include <limits> #include <cassert> class IOContext { public: explicit IOContext(unsigned entries = 1024) { if (auto res = ::io_uring_queue_init(entries, &ring_, 0); res < 0) throw_system_error(-res, "io_uring_queue_init"); wakeup_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (wakeup_fd_ == -1) throw_system_error("Failed to create eventfd for stopping IOContext"); arm_wakeup(); } IOContext(const IOContext&) = delete; auto operator=(const IOContext&) -> IOContext& = delete; // 为了简化实现,我们不支持移动 IOContext(IOContext&& other) noexcept = delete; auto operator=(IOContext&&) -> IOContext& = delete; ~IOContext() { ::io_uring_queue_exit(&ring_); ::close(wakeup_fd_); } void run() { ::io_uring_cqe* cqe{ nullptr }; while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0) { auto res = ::io_uring_submit_and_wait(&ring_, 1); if (res < 0) { if (res == -EINTR) continue; throw_system_error("io_uring_submit_and_wait"); } unsigned head; unsigned count{ 0 }; unsigned workdone{ 0 }; io_uring_for_each_cqe(&ring_, head, cqe) { ++count; // 探测到内部唤醒信号 if (io_uring_cqe_get_data64(cqe) == WAKEUP_MARKER) { resume_wakeup(); arm_wakeup(); continue; } // 正常的业务逻辑完成事件 if (io_uring_cqe_get_data64(cqe) != 0) { auto* op = static_cast<Operation*>(io_uring_cqe_get_data(cqe)); op->complete(cqe->res, cqe->flags); ++workdone; } } // 推进环形缓冲区必须使用总事件数 count if (count > 0) ::io_uring_cq_advance(&ring_, count); // 扣减未决任务必须使用实际完成的业务数 workdone if (workdone > 0) outstanding_works_ -= workdone; } } [[nodiscard]] auto sqe() -> ::io_uring_sqe* { auto* sqe = ::io_uring_get_sqe(&ring_); if (!sqe) throw_system_error("io_uring_get_sqe"); add_work(); return sqe; } void stop() { should_stop_.store(true, std::memory_order_relaxed); wakeup(); } auto ring() noexcept -> ::io_uring* { return &ring_; } auto ring() const noexcept -> const ::io_uring* { return &ring_; } void add_work() noexcept { ++outstanding_works_; } void drop_work() noexcept { assert(outstanding_works_ > 0); --outstanding_works_; } private: static constexpr auto WAKEUP_MARKER = std::numeric_limits<std::uintptr_t>::max(); ::io_uring ring_{}; int wakeup_fd_{ -1 }; // 只用来追踪 io_context 之外的操作,并不需要用户主动来使用相关的接口 std::size_t outstanding_works_{ 0 }; // stop 会被跨线程调用,所以需要使用原子变量来保证线程安全 std::atomic<bool> should_stop_{ false }; void arm_wakeup() noexcept { auto* sqe = ::io_uring_get_sqe(&ring_); if (!sqe) throw_system_error("io_uring_get_sqe failed when re-arming wakeup"); ::io_uring_prep_poll_add(sqe, wakeup_fd_, POLLIN); // 采用 64 位专有 setter,避免指针转换警告 ::io_uring_sqe_set_data64(sqe, WAKEUP_MARKER); } void wakeup() { std::uint64_t val = 1; ::write(wakeup_fd_, &val, sizeof(val)); } void resume_wakeup() { uint64_t val; ::read(wakeup_fd_, &val, sizeof(val)); } };关于未处理完成的 CQE 的处置:
触发stop()退出循环后,队列中如果还有积压的 CQE 怎么办?这正是 RAII 管理机制的优势所在。
IOContext的生命周期与系统资源严格绑定,当程序退出,IOContext析构时,io_uring_queue_exit会协同内核彻底销毁共享的环形缓冲区。由于事件循环已经终止,不会再有新的逻辑被触发,因此忽略未决的 CQE 是安全且合理的策略。如果用户确实有在停止后清理特定状态的需求,可以通过暴露的
ring()接口自行干预。3. 基于 signalfd 的统一中断处理
既然已经实现了安全的唤醒与停止语义,顺理成章地,我们应将操作系统的信号(如
SIGINT,SIGTERM)也纳入异步框架。在 Linux 平台上,signalfd提供了一种将异步中断转化为文件描述符可读事件的机制,它能被完美地集成进io_uring轮询模型中。3.1 强类型 Signal 封装
避免裸露的魔术整数:
class Signal { public: explicit constexpr Signal(int signal) noexcept : signal_{ signal } {} auto operator==(const Signal&) const noexcept -> bool = default; // 这里的隐式转换是否提供看个人,我觉得不提供更好 constexpr operator int() const noexcept { return signal_; } [[nodiscard]] constexpr auto value() const noexcept { return signal_; } private: int signal_; }; struct signals { signals() = delete; static constexpr auto interrupt = Signal{ SIGINT }; static constexpr auto terminate = Signal{ SIGTERM }; static constexpr auto quit = Signal{ SIGQUIT }; static constexpr auto hangup = Signal{ SIGHUP }; };3.2 信号集 SignalSet 管理
使用折叠表达式优雅地处理变参掩码。另外,为保障跨线程时的健壮性,此处采用了标准的
pthread_sigmask。#include <sys/signalfd.h> #include <signal.h> template<typename Context> class SignalSet { public: template<typename... Signals> requires (std::same_as<Signals, Signal> && ...) SignalSet(Context& io_context, Signals... signals) : io_context_{ io_context } { ::sigemptyset(&mask_); (::sigaddset(&mask_, signals.value()), ...); // 屏蔽这些信号的默认异步行为,交由 signalfd 同步读取 if (::pthread_sigmask(SIG_BLOCK, &mask_, nullptr) == -1) throw_system_error("Failed to block signals"); fd_ = ::signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC); if (fd_ == -1) throw_system_error("Failed to create signalfd"); } SignalSet(const SignalSet&) = delete; auto operator=(const SignalSet&) -> SignalSet& = delete; SignalSet(SignalSet&& other) noexcept : io_context_{ other.io_context_ }, fd_{ std::exchange(other.fd_, -1) }, mask_{ other.mask_ } {} auto operator=(SignalSet&& other) noexcept -> SignalSet& = delete; ~SignalSet() { if (fd_ != -1) ::close(fd_); } private: Context& io_context_; int fd_{ -1 }; sigset_t mask_; };3.3 构建 PollAwaiter 与 async_wait
既然是协程库,那我们理所应当的应该将监听行为设计成协程。为监听可读事件提供通用的
PollAwaiter:template<typename Context> class PollAwaiter : public Operation { public: using resume_type = void; PollAwaiter(Context& context, int fd, short events) noexcept : context_{ context }, fd_{ fd }, events_{ events } {} [[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; } auto await_suspend(std::coroutine_handle<> handle) noexcept -> void { handle_ = handle; auto* sqe = context_.sqe(); ::io_uring_prep_poll_add(sqe, fd_, events_); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() const noexcept -> std::expected<void, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return {}; } void complete(int res, [[maybe_unused]] std::uint32_t flags) noexcept override { error_code_ = res < 0 ? -res : 0; if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } private: Context& context_; int fd_; short events_; std::coroutine_handle<> handle_{ nullptr }; int error_code_{ 0 }; }; // 在 SignalSet 外部实现 template<typename Context> auto SignalSet<Context>::async_wait() noexcept -> PollAwaiter<Context> { return PollAwaiter<Context>{ io_context_, fd_, POLLIN }; }4. 系统集成演示
至此,基础组件均已就位。我们可以轻松写出一个支持非阻塞延时、并通过
Ctrl+C信号安全、优雅退出的并发模型。auto shutdown_monitor(IOContext& context) -> Task<void> { SignalSet sets{ context, signals::interrupt, signals::terminate }; // 挂起协程,等待操作系统向底层派发 SIGINT 或 SIGTERM co_await sets.async_wait(); spdlog::info("Received shutdown signal, stopping IOContext..."); context.stop(); } auto demo(IOContext& context) -> Task<void> { using namespace std::chrono_literals; spdlog::info("before sleep..."); // 模拟长耗时异步任务 co_await sleep_for(context, 10min); spdlog::info("after sleep..."); } int main(int argc, char* argv[]) { IOContext context{}; // 并发派发两个独立协程:一个执行业务,一个负责监听中断 co_spawn(context, demo(context)); co_spawn(context, shutdown_monitor(context)); context.run(); spdlog::info("IOContext stopped, exiting..."); return EXIT_SUCCESS; }运行结果:
在长达 10 分钟的sleep任务途中,我们通过Ctrl+C触发键盘中断,signalfd捕获到信号,唤醒了沉睡的shutdown_monitor协程,随后成功中断事件循环,程序安全退出。blog.io_context_v2 [2026-04-19 22:33:21.313] [info] before sleep... ^C[2026-04-19 22:33:22.954] [info] Received shutdown signal, stopping IOContext... [2026-04-19 22:33:22.954] [info] IOContext stopped, exiting... -
-
【 基于 io_uring 的 C++20 协程网络库】01:基础骨架与 Awaiter 机制目标与设计边界
本文旨在实现一个基于
io_uring封装的 C++ 协程网络库。在着手编码前,我们先确立一个严格的设计边界:不考虑跨平台,不考虑兼容 epoll 等传统多路复用机制。
为什么舍弃跨平台等通用性?
一旦引入跨平台封装,不仅维护成本陡增,更关键的是性能势必要做出妥协。不同操作系统的异步 API 在机制上存在根本分歧,强行封装通常只能取它们的公共子集,或者在用户态引入额外的抽象层来模拟缺失的语义。无论哪种方式,都会对最终性能造成不可预期的损耗。在基础设施级别的系统库中,性能是无法在项目后期通过“手法”来弥补的,必须在架构初期就定下基调。因此,我们选择不给自己埋雷,直接将底层与
io_uring强绑定。为什么选择 io_uring?
相比于 epoll,
io_uring的心智模型更加契合协程。
epoll 暴露的是 Reactor 模型接口(就绪通知),本质上依然是接近线程回调的处理方式。而io_uring是标准的 Proactor 模型(完成通知)。C++20 的协程天然就是一个异步操作状态机,也是标准的 Proactor 范式。两者的结合能最大程度地降低封装阻抗,减少无谓的状态转换代码。IOContext 是什么?
对于初接触异步网络库的读者,可以简单将
IOContext理解为事件收割机与协程调度中枢。在代码中,你提交的各种异步操作(即
io_uring的 SQE 事件),最终都需要一个统一的执行流去收割它们的完成结果(CQE)。成熟的模式是借鉴 Boost.Asio 的io_context抽象:通过阻塞调用IOContext::run()来消耗掉所有已就绪事件,唤醒对应的协程,然后继续等待下一轮事件就绪。构建基础框架
基于 C++ 的 RAII 原则,
IOContext的首要任务是管理io_uring实例的生命周期。1. 实例初始化
int io_uring_queue_init(unsigned entries, struct io_uring* ring, unsigned flags);entries: 提交队列(SQ)的深度。必须是 2 的幂(如 128, 256)。内核会基于此分配共享内存环。ring: 指向待初始化的实例。成功后,内存映射地址、队列掩码等状态将被写入该结构。flags: 控制行为的标志位(如启用SQPOLL消除系统调用)。我们这里默认置 0 即可。
失败时直接返回负值的系统错误码,不依赖全局
errno。2. 实例销毁
void io_uring_queue_exit(struct io_uring* ring);该函数负责解除内存映射 (
munmap),并关闭io_uring在内核中对应的匿名文件描述符,防止虚拟内存与文件句柄泄漏。IOContext 资源管理骨架
基于上述 API,我们搭建出
IOContext的核心轮廓。由于该上下文作为核心中枢运转,移动语义会引发悬垂指针等复杂问题,因此我们在设计上严格禁用拷贝与移动。#include <liburing.h> #include <atomic> class IOContext { public: explicit IOContext(unsigned entries) { if (auto res = ::io_uring_queue_init(entries, &ring_, 0); res < 0) throw_system_error(-res, "io_uring_queue_init"); } IOContext(const IOContext&) = delete; auto operator=(const IOContext&) -> IOContext& = delete; // 为了简化实现,我们不支持移动 IOContext(IOContext&&) = delete; auto operator=(IOContext&&) -> IOContext& = delete; ~IOContext() { ::io_uring_queue_exit(&ring_); } [[nodiscard]] auto ring() noexcept -> ::io_uring* { return &ring_; } [[nodiscard]] auto ring() const noexcept -> const ::io_uring* { return &ring_; } private: ::io_uring ring_; };收割已就绪事件 (CQE)
接下来实现核心引擎
IOContext::run()。这涉及三个底层操作流:-
等待事件就绪:
io_uring_submit_and_wait(struct io_uring* ring, unsigned wait_nr);将提交操作和阻塞等待融合成一次系统调用。
wait_nr指明线程必须阻塞到至少出现多少个完成事件才唤醒返回。返回值陷阱:成功时返回的是提交的 SQE 数量,而非完成的 CQE 数量。
-
遍历完成队列 (CQ):
io_uring_for_each_cqe是一个纯用户态宏。它通过带有 Acquire 语义的内存屏障读取 CQ 尾指针,无锁且零拷贝地遍历就绪事件。状态剥离陷阱:该宏只是只读遍历,不修改内核视角的头部指针。如果仅仅遍历而不推进状态,队列最终会溢出导致
-EBUSY。 -
确认事件消费:
io_uring_cq_advance(struct io_uring* ring, unsigned nr);修改用户空间的 Head 指针,并通过 Store-Release 语义发布给内核,正式确认这些事件已被收割。
user_data 与类型擦除
io_uring_cqe结构中包含一个__u64 user_data字段。当我们在 SQE 中设置它时,内核会原封不动地将其带入 CQE 返回。这使得我们能够将该标识强制转换回 C++ 对象的指针。为此,我们提供一个
Operation基类,所有协程 Awaiter 都必须继承此接口:struct Operation { virtual ~Operation() = default; virtual void complete(int res, unsigned flags) = 0; };优雅的退出:should_stop_ 的无锁设计
为了安全退出事件循环,我们引入
should_stop_变量。即便网络库采用 Core Per Thread 模型,不涉及跨业务线程的同步,但stop()操作往往是由操作系统的信号处理器(Signal Handler,如处理 Ctrl+C)触发的。信号中断具有强抢占性,因此必须使用std::atomic。值得注意的是,这里我们不使用 CAS(Compare-And-Swap)。由于停止是一个幂等且无条件的覆盖动作,我们完全不关心过去的运行状态。直接使用
store配合最松散的std::memory_order_relaxed即可。这提供了硬件级别的防数据撕裂保证,同时将同步开销降到了绝对的最低点。WARN: 只有这个变量显然是不足以完整实现 stop 功能的,还需要考虑如何取消已经提交但尚未完成的 I/O 请求,以及如何通知正在等待的 run() 方法尽快返回。我们将在未来的版本中逐步完善这个功能。
完整的 run() 实现
结合外部任务追踪机制,事件循环的最终代码如下:
class IOContext { // ... 构造与析构保持不变 ... void run() { ::io_uring_cqe* cqe{ nullptr }; while (!should_stop_.load(std::memory_order_relaxed) && outstanding_works_ > 0) { auto res = ::io_uring_submit_and_wait(&ring_, 1); if (res < 0) throw_system_error("io_uring_submit_and_wait"); unsigned head; unsigned count{ 0 }; io_uring_for_each_cqe(&ring_, head, cqe) { ++count; if (cqe->user_data != 0) { auto* op = reinterpret_cast<Operation*>(cqe->user_data); op->complete(cqe->res, cqe->flags); } } if (count > 0) { outstanding_works_ -= count; ::io_uring_cq_advance(&ring_, count); } } } [[nodiscard]] auto sqe() -> ::io_uring_sqe* { auto* sqe = ::io_uring_get_sqe(&ring_); if (!sqe) xin::throw_system_error("io_uring_get_sqe"); add_work(); return sqe; } void stop() noexcept { should_stop_.store(true, std::memory_order_relaxed); } // 为了搭配co_spawn,需要暴露add_work和drop_work方法 void add_work() noexcept { ++outstanding_works_; } void drop_work() noexcept { assert(outstanding_works_ > 0); --outstanding_works_; } private: ::io_uring ring_; std::size_t outstanding_works_{ 0 }; std::atomic<bool> should_stop_{ false }; };深入 Awaiter 机制:SleepAwaiter 实践
单有一个
IOContext是跑不起来的,我们需要验证它与 C++20 协程的交互机制。在此,我们实现一个SleepAwaiter,封装io_uring的IORING_OP_TIMEOUT定时器。#include <chrono> #include <coroutine> #include <expected> #include <system_error> #include <utility> class SleepAwaiter : public Operation { public: template<typename Duration> SleepAwaiter(IOContext& context, Duration d) noexcept : context_{ context } { using namespace std::chrono; ts_.tv_sec = duration_cast<seconds>(d).count(); ts_.tv_nsec = duration_cast<nanoseconds>(d % seconds(1)).count(); } [[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); // 提交纯超时指令,count 设为 0 表示只受时间触发 ::io_uring_prep_timeout(sqe, &ts_, 0, 0); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() const noexcept -> std::expected<void, std::error_code> { // io_uring 中,超时正常结束会返回 ETIME if (error_code_ == ETIME || error_code_ == 0) return {}; // 其他错误(如 -ECANCELED 被提前强杀) return std::unexpected{ std::error_code{ error_code_, std::generic_category() } }; } void complete(int res, [[maybe_unused]] std::uint32_t flags) noexcept override { error_code_ = -res; if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } private: IOContext& context_; struct __kernel_timespec ts_{}; std::coroutine_handle<> handle_{ nullptr }; int error_code_{ 0 }; }; template<typename Duration> auto sleep_for(IOContext& context, Duration duration) noexcept -> SleepAwaiter { return SleepAwaiter{ context, duration }; }零开销生命周期管理
留意
await_suspend中的::io_uring_prep_timeout(sqe, &ts_, 0, 0);。我们将局部对象ts_的地址交给了内核。在传统的异步回调编程中,这是一个极易触发悬垂指针的致命错误,通常需要用std::shared_ptr在堆上分配来强行续命。但在这里,它是绝对安全的。因为
SleepAwaiter本身的生命周期被牢牢绑定在了协程帧内部。直到complete回调中触发handle.resume()彻底唤醒协程后,该 Awaiter 才会被销毁。协程从语言底层提供了天然的内存安全保障,这也是 C++ 追求零开销抽象的绝佳体现。测试示例
最后,我们用一段简单的代码来验证整个基建流转:
auto demo(IOContext& context) -> Task<void> { using namespace std::chrono_literals; spdlog::info("before sleep"); co_await sleep_for(context, 5s); spdlog::info("after sleep"); } int main(int argc, char* argv[]) { IOContext context{}; co_spawn(context, demo(context)); context.run(); return EXIT_SUCCESS; }输出如下,可以看到,5s后再次输出内容,这证明从请求提交、内核响应、上下文分发到协程唤醒的全链路已完全贯通:
blog.io_context_v1 [2026-04-19 16:18:17.642] [info] before sleep... [2026-04-19 16:18:22.642] [info] after sleep... -
启动并分离 - co_spawn在基于 C++20 协程构建的异步框架里,
Task<T>通常有着严格的结构化并发语义:调用者必须去co_await它,子任务的生命周期被死死地绑在父任务上。但现实世界没这么理想,系统架构中必然存在同步世界与异步世界的交汇点。最典型的例子就是 TCP 服务器的事件循环:
// 这是一个底层的同步事件循环 void server_accept_loop(io_context& ctx) { while (true) { stream_socket client = accept_connection(ctx); // 业务协程:处理单个客户端连接 // 函数签名:Task<void> handle_client(stream_socket client); // 问题:怎么在这里启动 handle_client,然后不管它,直接去接下一个客? // 方案 1: 直接调用?没用,返回值 Task 被丢弃,协程根本不会跑(惰性求值)。 // handle_client(std::move(client)); // 方案 2: 使用 co_await?编译直接报错!因为当前函数是个普通函数,不是协程。 // co_await handle_client(std::move(client)); // 方案 3: 把server_accept_loop的返回值改成Task<void>。这里使用co_await // 可以编译,但是一次只能处理一个client } }为了解决这个问题,我们必须提供一个“启动并分离(Fire and Forget)”的方法,类似std::thread的detach模式。这没法用常规的
Task<T>表达,我们需要自己捏一个底层原语:co_spawn。下面我们将从零开始,一步步打磨出一个内存安全、支持优雅停机的
co_spawn。
1. 裸分离:让编译器帮我们擦屁股
要把一个任务扔到后台,第一个要面对的灵魂拷问就是:这个协程在堆上分配的内存帧(Coroutine Frame),最后谁来删?
既然分离出去了,外部就不存在任何变量持有它的句柄。在 C++20 里,最优雅的解法是:配置好参数,让编译器自己管理。
按照协程规范,只要
promise_type::final_suspend()返回std::suspend_never,协程走到生命周期尽头时,运行时就会自动delete掉那块堆内存。据此,我们可以写出一个极简的“裸分离”壳子:// 纯粹的空壳,仅用于触发编译器的自动清理 struct DetachedTask { struct promise_type { auto get_return_object() noexcept { return detached{}; } // 饥饿启动:一创建就立马执行 auto initial_suspend() noexcept { return std::suspend_never{}; } // 核心:结束时不挂起,触发自动销毁 auto final_suspend() noexcept { return std::suspend_never{}; } void return_void() noexcept {} void unhandled_exception() noexcept { std::terminate(); } }; }; // 极简版原语 template<typename Awaitable> auto co_spawn(Awaitable awaitable) -> DetachedTask { co_await std::move(awaitable); }然后,我们在同步回调里调用
co_spawn(handle_client(std::move(client)));时,协程帧会立即投入运行。遇到 I/O 挂起时,控制流会return回主循环(不会阻塞线程!)。等任务彻底跑完,走向final_suspend,内存安全摧毁,干干净净。2. 状态追踪:用 RAII 告别“幽灵任务”
上面这套“裸分离”虽然在语言机制上跑得通,但在工程上其实是个定时炸弹。因为它没法做状态追踪,也就没法支持服务器的优雅停机(Graceful Shutdown)。
试想一下,如果你发个
SIGTERM准备关进程,底层的事件分发器怎么知道还有多少个co_spawn出去的任务在挂起等 I/O?如果直接把底层上下文销毁了,等这些“幽灵任务”被唤醒时,面对的就是一片废墟,当场 Core Dump。所以,分离出的协程必须和底层的上下文绑定生命周期:诞生时登记,死亡时注销。
这里我们假定一个context应该支持add_work()和drop_work()来管理分离出去的任务。// 假定上下文支持增减引用计数 template<typename T> concept tracking_context = requires(T& ctx) { ctx.add_work(); ctx.drop_work(); }; template<tracking_context Context> struct DetachedTask { struct promise_type { Context* context = nullptr; // 拦截参数:拿到上下文引用,生命周期开始时登记 template<typename Awaitable> promise_type(Context& ctx, Awaitable&&) : context{ &ctx } { context->add_work(); } // 绑定析构:随协程帧被编译器销毁时,自动注销 ~promise_type() { if (context) context->drop_work(); } auto get_return_object() noexcept { return DetachedTask{}; } auto initial_suspend() noexcept { return std::suspend_never{}; } auto final_suspend() noexcept { return std::suspend_never{}; } void return_void() noexcept {} void unhandled_exception() noexcept { std::terminate(); } }; };3. 最终的
co_spawn有了上面这个支持状态追踪的
detached_task,我们就可以给出co_spawn的最终接口了。// 注意这里的 Awaitable awaitable 是按值传递! template<tracking_context Context, awaitable Awaitable> requires std::movable<std::remove_cvref_t<Awaitable>> auto co_spawn(Context& ctx, Awaitable awaitable) -> DetachedTask<Context> { // 移动进协程帧里,生命周期交给DetachedTask co_await std::move(awaitable); }为什么要Awaitble必须按值传?
在协程里,如果参数是引用,堆上的协程帧就只会存个指针。像handle_client(std::move(client))这种调用,产生的是个临时对象(右值)。如果co_spawn接的是个引用,等它内部第一次co_await挂起、把控制权还给外层时,这个临时对象早就析构了!这会导致极其隐蔽的悬垂引用Bug。通过强制按值传递,我们用移动语义,把临时的业务任务移动到了协程帧内部,只要协程不死,它的状态就绝对安全。
4. 补充concept:到底什么是
awaitable?细心的朋友肯定注意到了,在最终的
co_spawn签名里,我用了一个awaitable的概念。在 C++20 里,一个东西能被
await,无非三种情况:- 它自己就是个
awaiter,即有3个await函数 - 它重载了成员方法
operator co_await() - 有对应的全局重载。
我们就把这个concept用代码翻译出来:
template<typename T> concept awaiter = requires(T& t, std::coroutine_handle<> handle) { { t.await_ready() } -> std::convertible_to<bool>; t.await_suspend(handle); t.await_resume(); }; template<typename T> concept has_operator_co_await = requires(T&& t) { { std::forward<T>(t).operator co_await() } -> awaiter; }; template<typename T> concept has_global_operator_co_await = requires(T&& t) { { operator co_await(std::forward<T>(t)) } -> awaiter; }; template<typename T> concept awaitable = awaiter<T> || has_operator_co_await<T> || has_global_operator_co_await<T>;
实战演示
最后,给一段伪代码示例,看看它是怎么在业务里落地的:
import std; // 1. 实现一个满足 tracking_context 契约的上下文 struct MyIOContext { int active_tasks = 0; void add_work() { ++active_tasks; std::cout << "[Context] 任务+1,当前活跃数: " << active_tasks << "\n"; } void drop_work() { --active_tasks; std::cout << "[Context] 任务结束,当前活跃数: " << active_tasks << "\n"; } void run_loop() { // 真实场景里,这里是 epoll_wait 或 io_uring_enter 阻塞等事件 std::cout << "[Context] 开启事件循环,等待 I/O...\n"; } }; // 2. 模拟一个能被 co_await 的异步操作 (满足 awaitable 契约) struct DummyAsyncRead { bool await_ready() { return false; } void await_suspend(std::coroutine_handle<>) { std::cout << " -> 协程挂起,把 fd 注册到 epoll...\n"; } void await_resume() { std::cout << " -> 协程恢复,拿到数据!\n"; } }; // 业务逻辑协程 DummyAsyncRead handle_client(int client_fd) { std::cout << "开始处理客户端: " << client_fd << "\n"; // 遇到 IO 挂起 co_await DummyAsyncRead{}; } // 3. 跑起来 int main() { MyIOContext ctx; std::cout << "--- 服务器启动 ---\n"; // 启动并分离,立刻返回 co_spawn(ctx, handle_client(1001)); co_spawn(ctx, handle_client(1002)); std::cout << "--- 同步的 main 函数丝毫不受阻塞 ---\n"; ctx.run_loop(); return 0; } - 它自己就是个
-
从零构建基于 C++20 的 TaskC++20 引入了无栈协程(Stackless Coroutines)的核心语言机制,但与之相配套的标准库高级抽象(如
std::task)并未同步提供。在构建基于io_uring或epoll的高性能并发框架时,我们不可避免地需要自行设计一个用于封装异步操作的返回类型:Task<T>。设计这样一个任务类型,不仅仅是对新关键字的语法包装,其本质是在解决两个系统级编程的核心问题:
- 控制流的无缝路由
- 堆分配状态帧的确定性释放。
本文将探讨如何从零构建一个可用的Task<T>。
1. 异步组合的困境与懒启动(Lazy Evaluation)
在传统的同步流中,函数的调用即意味着执行的开始。但在异步架构中,任务的“构造”与“执行”往往需要被严格分离。
为了建立直观的理解,我们可以先参考 Python 中的协程行为。在 Python 中,调用一个
async def函数并不会立即执行其内部代码,而是仅仅返回一个协程对象:import asyncio async def fetch_data(): print("开始发起网络请求...") # ... # 此时并不会打印任何内容,仅仅是构造了一个任务对象 task = fetch_data() # 只有显式地等待或交给事件循环,代码才会真正运转 # await task这种机制被称为懒启动(Lazy Evaluation)。如果我们允许 C++ 的协程在被调用时立即开始执行(即所谓的 Eager Evaluation),它可能会在尚未正确挂载到事件循环(Event Loop)之前,就过早地触发了底层的 I/O 投递操作。这不仅破坏了状态的封装,还极易引发复杂的竞态条件。
因此,一个健壮的 C++
Task<T>必须是懒启动的。这在 C++20 中是通过定制promise_type的初始化行为来实现的:class promise_type { public: // 协程帧创建后立即挂起,不主动执行协程体代码 auto initial_suspend() noexcept -> std::suspend_always { return {}; } // ... };通过返回
std::suspend_always,协程在完成内部状态帧的堆分配后会立刻交出控制权。这种设计使得异步任务可以像普通的数据结构一样被安全地传递、存储和组合,直到调用者显式地通过co_await来驱动它。2. 协程间的控制流移交
异步操作很少是孤立存在的。当父协程执行
co_await child_task;时,当前的执行流必须被挂起,并将 CPU 的控制权移交给子协程。同时,子协程必须知晓在自身执行完毕后,应该唤醒哪一个调用者。为了建立这种调用链,我们利用了
co_await运算符所触发的编译器协议。在 C++20 中,
co_await并非一个简单的挂起指令,而是一个可定制的控制流拦截点。当编译器遇到co_await <expr>时,它会要求<expr>产出一个符合特定接口的Awaiter对象,并依次调用其三个核心方法:await_ready():探测状态。询问异步操作是否已经完成。如果返回true,编译器将走“快速通道”,直接跳过挂起阶段;如果返回false,则准备挂起当前协程。await_suspend(std::coroutine_handle<>):核心拦截点。在当前协程的物理状态(寄存器、局部变量)被安全保存到堆上的协程帧后,编译器会调用此方法,并将当前(父)协程的句柄作为参数传入。await_resume():结果提取点。当协程被再次唤醒时,此方法的返回值将作为整个co_await表达式的结果。
基于这一协议,我们在
Task内部定义了专门的Awaiter,以此来接管并路由控制流:class Awaiter { public: explicit Awaiter(handle_type handle) : handle_{ handle } {} // 1. 探测状态:如果子协程尚未执行完毕,则强制父协程挂起 bool await_ready() const noexcept { return !handle_ || handle_.done(); } // 2. 挂起时的控制流路由 auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<> { // 将父协程的句柄 (next) 记录在子协程的 promise 状态中 handle_.promise().next = next; // 返回子协程的句柄,指示 C++ 运行时将执行流切换至子协程 return handle_; } // 3. 唤醒后的结果提取 auto await_resume() const -> T { if (!handle_) throw std::logic_error{ "Invalid handle" }; return handle_.promise().result(); } private: handle_type handle_; // 子协程的句柄 };通过这一套状态机转换,C++ 将底层的调度权完全下放给了库作者。
在
await_suspend执行的瞬间,父协程已被安全冻结。我们将其句柄保存在子协程的
promise_type::next字段里,从而在内存中建立了一个单向的调用链表(父 -> 子)。紧接着返回子协程的
handle_,运行时会直接跳转执行子协程代码,实现了零开销的上下文切换。3. 栈溢出风险与对称传输(Symmetric Transfer)
子协程执行到末尾(或遇到
co_return)时,需要唤醒之前等待它的父协程。这往往是自定义协程实现中最容易出错的环节。直觉上的做法是,在子协程的收尾阶段直接调用
next.resume()。然而,这种非对称传输(Asymmetric Transfer)存在致命缺陷:resume()本质上是一个常规的同步函数调用。在网络服务这类存在深层嵌套或无限循环挂起的场景中(例如
while(true) { co_await read(); }),每一次resume()都会在操作系统的线程栈上压入一个新的栈帧。调用链越长,栈越深,最终必然导致 Stack Overflow(栈溢出)。为了提供工业级的稳定性,
Task在收尾时必须采用对称传输(Symmetric Transfer):class FinalAwaiter { public: bool await_ready() const noexcept { return false; } template<typename Promise> auto await_suspend(std::coroutine_handle<Promise> handle) const noexcept -> std::coroutine_handle<> { auto next = handle.promise().next; // 关键点:直接返回父协程的句柄,而非调用 next.resume() return next ? next : std::noop_coroutine(); } void await_resume() const noexcept {} }; // 在 promise_type 中指定收尾行为: auto final_suspend() noexcept -> FinalAwaiter { return {}; }通过让
final_suspend返回一个包含父协程句柄的Awaiter,编译器会采用类似尾调用优化(Tail Call)的机制:它会首先将当前子协程的物理栈帧安全剥离,然后再以平级跳转的方式进入父协程。
在这种机制的保障下,无论业务逻辑中
co_await嵌套了多少层,底层的线程调用栈深度始终保持恒定 (O(1))。4. 返回值的提取与异常路由
异步任务不仅涉及控制流的跳转,还必须安全地跨越挂起边界传递数据或异常,并且表现得如同普通的 C++ 函数调用一样。
在子协程内部,产生的值或未捕获的异常被分别存储在
promise_type的std::optional<T>和std::exception_ptr中。当父协程通过对称传输被唤醒,并执行await_resume()时,需要提取这些结果:auto result() -> T { if (exception_) std::rethrow_exception(exception_); return std::move(value_).value(); }这里包含两个重要的设计约束:
- 异常透明性:
std::rethrow_exception确保了子协程中发生的异常能够被无缝抛出,并被父协程的try-catch块捕获,维持了 C++ 异常处理语义的连贯性。 - 资源所有权转移:通过
std::move提取值,保证了诸如std::unique_ptr或封装了系统资源(如文件描述符)的不可拷贝对象(Move-Only Types)能够被正确返回。
5. 协程帧的生命周期管理与单次消费语义
无栈协程的局部变量和
promise_type被编译器分配在堆上的协程帧(Coroutine Frame)中。由于 C++ 没有垃圾回收机制,资源泄漏是协程编程中的主要风险之一。依据 C++ 核心的 RAII(资源获取即初始化)原则,
Task对象作为协程句柄的唯一持有者,理应负责这块内存的清理:template<typename T> class Task { public: ~Task() { if (handle_) handle_.destroy(); } // 限制为右值调用,且不转移 handle_ 的所有权 auto operator co_await() && noexcept { return Awaiter{ handle_ }; } };这里有两个深思熟虑的设计权衡:
第一:为什么限制
operator co_await为右值版本(&&)?
协程代表一个异步计算过程,其内部结果(特别是前文提到的 Move-Only 类型)在await_resume中是被破坏性提取的(std::move)。这意味着一个Task在逻辑上只能被消费一次。如果允许对左值的Task进行co_await,调用者可能会意外地多次等待同一个任务:Task<int> t = do_work(); auto res1 = co_await t; auto res2 = co_await t; // 错误:底层协程已经结束,状态帧已被销毁通过添加
&&限定符,我们利用 C++ 的类型系统在编译期强制执行了“单次消费(Single-Shot)”语义。调用者必须直接等待临时对象(如co_await do_work();),或者显式地转移所有权(co_await std::move(t);)。这在接口层面明确了状态机的生命周期契约。第二:为什么在右值版本中,依然不剥夺 Task 的所有权?
通常在处理右值时,我们会使用std::exchange来转移底层资源。但在这里,我们仅向Awaiter传递了句柄的值。
当执行auto res = co_await do_work();时,do_work()产生的Task临时对象的生命周期会被编译器自动延续,直到整个co_await表达式求值完毕(即await_resume()返回之后)。此时,临时Task对象被析构,从而触发handle_.destroy()。
如果我们在此处剥夺了Task的所有权,清理责任就会落空。这种保留所有权的设计,确保了无论是正常执行完毕还是因异常提前中断,底层堆内存都能依托Task临时对象的析构函数被可靠地回收,实现了严格的内存安全。补充说明
在 C++ 中,临时对象的生命周期会持续到包含它的完整表达式(Full-expression)结束(通常是遇到分号 ;)。当我们写下如下代码时:
auto res = co_await do_something();编译器实际上会做如下展开(伪代码):
{ // 1. 调用函数,产生临时的 Task 右值对象 auto&& __tmp_task = do_something(); // 2. 调用 operator co_await,产生临时的 Awaiter 对象 auto&& __awaiter = __tmp_task.operator co_await(); if (!__awaiter.await_ready()) { // 3. 挂起当前协程,并调用 await_suspend __awaiter.await_suspend(current_coro_handle); // <--- 协程在这里彻底挂起,CPU 离开 ---> // <--- 时空流转,无论过了多久,终于被唤醒 ---> } // 4. 唤醒后,调用 await_resume 提取结果 auto res = __awaiter.await_resume(); } // 5. 完整表达式结束!按照构造的相反顺序销毁临时对象:先销毁 __awaiter,再销毁 __tmp_task关键点在于: 协程在挂起时,编译器非常清楚 __tmp_task 和 __awaiter 的生命周期需要跨越挂起点。因此,编译器不会把它们分配在容易被销毁的线程栈(Thread Stack)上,而是直接将它们作为局部变量,打包存储在“当前(父)协程的堆分配状态帧(Coroutine Frame)”中。
这意味着:
Task 对象在整个挂起期间一直安然无恙地活在堆内存里。
唤醒时,Awaiter 也并没有在栈上重建,你访问的依然是挂起前保存在堆里的那个确切的 Awaiter 实例。
Task 必定比 Awaiter 活得更久(先构造的后销毁)。
因此,Awaiter 内部仅持有 handle_ 的一个浅拷贝是绝对安全的,Task 完全不需要把所有权 exchange 给 Awaiter。
结语
设计一个现代 C++ 的
Task类,并非对关键字的简单拼接,而是对执行流跳转和资源生命周期的精密编排。通过懒启动隔离控制流、利用对称传输突破调用栈限制、借助 RAII 保障内存释放,我们最终构建出了一个符合 C++ 哲学体系的高性能并发原语。完整代码
export module xin.task; import std; namespace xin { class FinalAwaiter { public: [[nodiscard]] constexpr auto await_ready() const noexcept -> bool { return false; } template<typename Promise> auto await_suspend(std::coroutine_handle<Promise> handle) const noexcept -> std::coroutine_handle<> { auto next = handle.promise().next; return next ? next : std::noop_coroutine(); } void await_resume() const noexcept {} }; export template<typename T = void> class Task; export template<typename T> class Task { public: class promise_type; using handle_type = std::coroutine_handle<promise_type>; class promise_type { public: auto get_return_object() noexcept -> Task { return Task{ handle_type::from_promise(*this) }; } auto initial_suspend() noexcept -> std::suspend_always { return {}; } auto final_suspend() noexcept -> FinalAwaiter { return {}; } void unhandled_exception() noexcept { exception_ = std::current_exception(); } template<typename U> requires std::convertible_to<U&&, T> void return_value(U&& value) noexcept(std::is_nothrow_constructible_v<T, U&&>) { value_.emplace(std::forward<U>(value)); } [[nodiscard]] auto result() -> T { if (exception_) std::rethrow_exception(exception_); if (!value_) throw std::logic_error{ "No value returned from coroutine" }; auto out = std::move(*value_); value_.reset(); return out; } std::coroutine_handle<> next{ nullptr }; private: std::exception_ptr exception_; std::optional<T> value_; }; Task() = default; Task(handle_type handle) : handle_{ handle } {} Task(const Task&) = delete; auto operator=(const Task&) -> Task& = delete; Task(Task&& other) noexcept : handle_{ std::exchange(other.handle_, {}) } {} auto operator=(Task&& other) noexcept -> Task& { if (this == &other) return *this; if (handle_) handle_.destroy(); handle_ = std::exchange(other.handle_, nullptr); return *this; } ~Task() { if (handle_) handle_.destroy(); } [[nodiscard]] auto done() const noexcept -> bool { return !handle_ || handle_.done(); } [[nodiscard]] auto handle() const noexcept -> handle_type { return handle_; } class Awaiter { public: explicit Awaiter(handle_type handle) : handle_{ handle } {} [[nodiscard]] auto await_ready() const noexcept -> bool { return !handle_ || handle_.done(); } auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<> { handle_.promise().next = next; return handle_; } auto await_resume() const -> T { if (!handle_) throw std::logic_error{ "Invalid coroutine handle" }; return handle_.promise().result(); } private: handle_type handle_; }; auto operator co_await() && noexcept { return Awaiter{ handle_ }; } private: handle_type handle_{ nullptr }; }; export template<> class Task<void> { public: class promise_type; using handle_type = std::coroutine_handle<promise_type>; class promise_type { public: std::coroutine_handle<> next{ nullptr }; auto get_return_object() noexcept -> Task { return Task{ handle_type::from_promise(*this) }; } auto initial_suspend() noexcept -> std::suspend_always { return {}; } auto final_suspend() noexcept -> FinalAwaiter { return {}; } void unhandled_exception() noexcept { exception_ = std::current_exception(); } void return_void() noexcept {} void result() { if (exception_) std::rethrow_exception(exception_); } private: std::exception_ptr exception_; }; Task() = default; Task(handle_type handle) : handle_{ handle } {} Task(const Task&) = delete; auto operator=(const Task&) -> Task& = delete; Task(Task&& other) noexcept : handle_{ std::exchange(other.handle_, nullptr) } {} auto operator=(Task&& other) noexcept -> Task& { if (this == &other) return *this; if (handle_) handle_.destroy(); handle_ = std::exchange(other.handle_, {}); return *this; } ~Task() { if (handle_) handle_.destroy(); } [[nodiscard]] auto done() const noexcept -> bool { return !handle_ || handle_.done(); } [[nodiscard]] auto handle() const noexcept -> handle_type { return handle_; } class Awaiter { public: explicit Awaiter(handle_type handle) : handle_{ handle } { } [[nodiscard]] auto await_ready() const noexcept -> bool { return !handle_ || handle_.done(); } auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<> { handle_.promise().next = next; return handle_; } void await_resume() const { if (!handle_) throw std::logic_error{ "Invalid coroutine handle" }; handle_.promise().result(); } private: handle_type handle_; }; auto operator co_await() && noexcept { return Awaiter{ handle_ }; } private: handle_type handle_{ nullptr }; }; } // namespace xin -
复刻asio版本的set_option本文将探讨如何在现代 C++ (C++20) 中实现 Socket 选项的强类型封装。
在原生 POSIX Socket API 中,配置选项依赖
setsockopt和getsockopt。这两个接口使用void*和socklen_t传递数据载荷,完全绕过了编译器的类型检查系统。这种设计极易引发类型不匹配与内存越界,严重违背了现代 C++ 强调的类型安全原则。我们将从强类型设计与传统基本类型的对比出发,探讨 Boost.Asio 优秀的静态多态设计,并最终利用 C++20 Concepts 实现一套零成本抽象、支持非侵入式扩展的强类型 Socket 接口。
1. 强类型抽象 vs 基本类型传参
在对底层 C API 进行 C++ 封装时,一种常见的简单做法是直接使用
int或bool等基本类型(Primitive Types)来传递配置值。例如提供形如set_option(int level, int name, int val)的通用函数,或者堆砌大量的成员函数如set_reuse_address(bool)、set_receive_buffer_size(int)。这种做法存在两个显著缺陷:
- 语义丢失与传参错位:裸露的
int失去了业务语义。在调用侧,很难立刻分辨传入的1究竟代表一个布尔开关,还是一个以字节为单位的大小。如果误将缓冲区大小传给了布尔选项,编译器通常无法拦截。 - 接口表面积膨胀:随着支持的选项不断增加,Socket 类会被海量的 Getter/Setter 淹没,维护成本极高。
强类型(Strong Typing)方案的核心思想是:将数据的业务语义(Level、Name)与底层存储类型在编译期绑定为一个不可分割的实体。通过为每一个选项定义独立的类型(如
receive_buffer_size),我们可以将 Socket 类的配置接口收敛为一个单一的泛型option(...)函数。编译器会根据传入的强类型对象自动进行重载决议与合法性校验,从根本上杜绝类型混用。2. 动态多态的妥协与 Boost.Asio 的静态多态
在明确了需要将选项抽象为独立类型后,习惯了面向对象编程(OOP)的开发者往往会利用接口继承来实现多态:
class ISocketOption { public: virtual ~ISocketOption() = default; virtual int level() const = 0; virtual int name() const = 0; virtual const void* data() const = 0; virtual std::size_t size() const = 0; };这种动态多态方案在接口层消灭了
void*,但代价十分高昂。SOL_SOCKET和SO_REUSEADDR等常数在编译期即可确定,为了适配接口却引入了虚函数表(vtable)的运行时开销,阻碍了编译器的内联优化。此外,传递选项往往需要伴随堆内存分配,这对底层网络库而言是不可接受的性能损耗。在性能极度敏感的基础设施中,Boost.Asio 给出了更优的解答:静态多态。
Asio 抛弃了传统的虚函数继承树。Socket 的
set_option是一个函数模板,它并不要求选项继承自某个基类,只要求选项类型在编译期提供特定的接口签名(以boost::asio::basic_socket的底层实现为例):template <typename SettableSocketOption> void set_option(const SettableSocketOption& option, boost::system::error_code& ec) { detail::socket_ops::setsockopt(impl_.socket_, option.level(impl_.protocol_), option.name(impl_.protocol_), option.data(impl_.protocol_), option.size(impl_.protocol_), ec); }编译器在实例化模板时,会将具体的选项类型直接展开。虚函数的开销被彻底抹平,常量参数在编译期被直接内联,实现了真正的“零成本抽象”。
3. 固化契约:引入 C++20 Concepts 与静态分发
Asio 的静态多态虽然强大,但也存在泛型编程的经典痛点:如果调用端传入了不符合规范的类型,编译器会深入模板内部引发冗长且难以阅读的实例化错误。
我们需要将这种隐式的约定转化为显式的“契约”。将对 Option 类型的要求提取出来,定义为严格的 C++20 Concept:
template<typename T> concept socket_option = requires(const T& opt) { { T::level } -> std::convertible_to<int>; { T::name } -> std::convertible_to<int>; { opt.data() } -> std::convertible_to<const void*>; { opt.size() } -> std::convertible_to<std::size_t>; };对于底层并非通过
setsockopt而是通过fcntl操作的标志位选项(如O_NONBLOCK),我们定义另一套契约:template<typename T> concept flag_option = requires(const T& opt) { { T::get_cmd } -> std::convertible_to<int>; { T::set_cmd } -> std::convertible_to<int>; { T::bit } -> std::convertible_to<int>; { bool(opt) } -> std::convertible_to<bool>; };引入 Concepts 不仅能在编译期提供精准的错误提示,更为函数重载提供了静态分发(Static Dispatch)的能力。 通过约束不同的 Concept,我们可以让
setsockopt和fcntl这两种底层截然不同的系统调用,在上层对外表现为完全统一的socket.option(...)API。4. 填平 C/C++ 鸿沟:细化选项类的设计
有了明确的契约,我们可以针对底层 C API 的不同需求,设计出高度复用的选项模板。
4.1 基础泛型标量 (ValueOption)
对于
SO_RCVBUF等直接接收整型参数的选项,可以提供基础的ValueOption模板:export template<int Level, int Name, std::integral T> class ValueOption { public: static constexpr int level = Level; static constexpr int name = Name; using value_type = T; ValueOption() = default; explicit ValueOption(T value) : value_{ value } {} [[nodiscard]] constexpr auto value() const noexcept -> T { return value_; } [[nodiscard]] auto data() const noexcept -> const void* { return &value_; } auto data() noexcept -> void* { return &value_; } [[nodiscard]] constexpr auto size() const noexcept -> std::size_t { return sizeof(value_); } private: T value_; };4.2 布尔类型适配与隐式转换的便利性
对于
SO_REUSEADDR等逻辑布尔选项,POSIX C API 通常要求传入一个 4 字节的int指针。直接使用bool会导致size()返回 1,在部分内核中引发EINVAL错误。我们通过BooleanOption抹平这一底层差异:export template<int Level, int Name> class BooleanOption { public: static constexpr int level = Level; static constexpr int name = Name; BooleanOption() = default; explicit BooleanOption(bool value) : value_{ value ? 1 : 0 } {} [[nodiscard]] constexpr auto value() const noexcept -> bool { return value_ != 0; } auto data() noexcept -> void* { return &value_; } [[nodiscard]] auto data() const noexcept -> const void* { return &value_; } [[nodiscard]] constexpr auto size() const noexcept -> std::size_t { return sizeof(value_); } constexpr operator bool() const noexcept { return value_ != 0; } private: int value_; };同理,设置非阻塞(
O_NONBLOCK)等属性需要通过fcntl函数修改文件描述符标志位,对应的FlagOption负责记录特定标志位的开关状态:export template<int GetCmd, int SetCMD, int Bit> class FlagOption { public: static constexpr int get_cmd = GetCmd; static constexpr int set_cmd = SetCMD; static constexpr int bit = Bit; FlagOption() = default; explicit FlagOption(bool enabled) : value_{ enabled ? bit : 0 } {} [[nodiscard]] constexpr auto value() const noexcept -> bool { return (value_ & bit) != 0; } constexpr operator bool() const noexcept { return (value_ & bit) != 0; } private: int value_; };关于
operator bool的工程考量:
提供constexpr operator bool() const noexcept能够显著降低调用端的认知阻力。当通过 Getter 读取一个标志位或布尔选项时,返回的是一个完整的强类型对象。借助于隐式转换为bool的能力,我们可以直接在分支语句中进行条件判断:// 借由 operator bool,直接在条件判断中使用,无需手动调用 .value() if (socket.option<TestSocket::non_blocking>()) { // 已经是异步模式,执行对应逻辑 }这一设计使得内部的位掩码运算与外部的条件控制逻辑实现了无缝衔接。
5. Socket 接口集成与静态分发
利用 C++20 的 Concept 约束,在
BaseSocket中为option方法提供精准的重载分发。无论是setsockopt还是fcntl,对外均呈现为统一的接口:export template<protocol Protocol> class BaseSocket { public: // 预定义常用选项别名 using reuse_address = BooleanOption<SOL_SOCKET, SO_REUSEADDR>; using receive_buffer_size = ValueOption<SOL_SOCKET, SO_RCVBUF, int>; using non_blocking = FlagOption<F_GETFL, F_SETFL, O_NONBLOCK>; // ... // 分发至 setsockopt 的重载 template<socket_option Option> void option(const Option& value) { if (::setsockopt(fd_, Option::level, Option::name, value.data(), value.size()) == -1) // ... } // 分发至 fcntl 的重载 template<flag_option Option> void option(const Option& value) { int current_flags = ::fcntl(fd_, Option::get_cmd); // ... int new_flags = value ? (current_flags | Option::bit) : (current_flags & ~Option::bit); if (::fcntl(fd_, Option::set_cmd, new_flags) == -1) // ... } // Getter 的实现同理,通过 Concept 进行重载分发... };业务代码的调用体验变得极其干净,彻底告别了底层指针、宏与位运算:
using TcpSocket = BaseSocket<xin::net::ip::v4::tcp>; TcpSocket socket; // 强类型配置,统一的 API 调用方式 socket.option(TcpSocket::reuse_address{ true }); socket.option(TcpSocket::receive_buffer_size{ 64 * 1024 }); socket.option(TcpSocket::non_blocking{ true });6. 非侵入式扩展:拥抱自定义选项
基于 Concepts 的静态多态带来了巨大的工程优势:非侵入式扩展。
如果用户需要设置一个非常见或系统特有的选项(例如 Linux 下的
TCP_CONGESTION拥塞控制算法),开发者完全不需要修改BaseSocket的源码,也不需要继承任何基类。只需在业务代码中定义一个满足socket_option契约的结构体,即可无缝融入这套方案:struct TcpCongestionOption { static constexpr int level = IPPROTO_TCP; static constexpr int name = TCP_CONGESTION; std::string algorithm; explicit TcpCongestionOption(std::string_view algo) : algorithm(algo) {} auto data() const noexcept -> const void* { return algorithm.data(); } auto size() const noexcept -> std::size_t { return algorithm.size(); } }; // 直接传入自定义选项,编译器自动验证契约并完成分发 socket.option(TcpCongestionOption{"bbr"});总结
从基本类型传参的语义丢失,到面向对象的多态舒适区,再到 Asio 的静态多态以及 C++20 Concepts 的契约化约束,这一演进过程展示了现代 C++ 在基础设施构建上的核心优势。通过强类型设计与 Concept 的静态分发能力,我们在屏蔽 C API 严苛内存要求的同时,提供了一套高度可扩展、接口一致且绝对类型安全的网络底层抽象。
附录:
本文相关完整 C++20 源码实现:点击此处查看完整实现代码 (xin::net::socket) - 语义丢失与传参错位:裸露的
-
std::format增强组件优雅与效率并存:基于 C++20 Concepts 构建非侵入式
std::format扩展1. 背景与痛点:
std::format很好,但还能更好在 C++20 引入
std::format之前,我一直使用fmt::format作为格式化输出的首选。两者的 API 几乎一致,但在将其作为标准库迁移并在大型工程落地时,几个显著的痛点让人如鲠在喉:- 满天飞的样板代码:
fmt提供了极为便利的format_as机制,而现阶段的std::format官方仅支持通过特化std::formatter<T>来实现自定义输出。这意味着每次接入一个自定义类型,都必须硬着头皮手写一遍冗长且高度重复的模板样板代码,心智负担极重。 - 手动调用的累赘感:为了逃避上述的样板代码,很多人会退而求其次,在类内提供一个
to_string()方法。但代价是每次打印都必须显式调用(如std::println("{}", obj.to_string())),不仅破坏了格式化字符串原有的简洁语义,写起来也极其繁琐累赘。 - 类型输出碎片化:如果没有统一约束,工程里的输出方式就会群魔乱舞:有的依赖遗留的
operator<<,有的每次现场手写std::format("...")拼接内部字段,代码风格极其割裂。 - 日志可读性劣化:尤其是枚举类型(
enum),默认直接输出底层整数值。在排查问题时面对满屏的“魔术数字”,必须反复去头文件反查定义,十分痛苦。 - 新类型接入成本高:由于缺乏统一、低成本的扩展范式,每次新增类型都要纠结“这次该怎么格式化”,稍有不慎还会与旧代码的重载产生隐式冲突。
2. 核心设计目标
为了彻底解决上述问题,我构建了一个轻量级的扩展组件,旨在实现以下目标:
- 复刻体验:提供类似
fmt::format_as极低成本的自定义接入点,告别特化样板代码。 - 鸭子类型:引入 Pythonic 的约定,支持自动探测并调用类内的
to_string()或to_repr()。 - 原生枚举:借助
magic_enum,实现枚举值的直接名称输出(告别魔术数字)。 - 无痛兼容:对已实现
operator<<的遗留类型提供平滑过渡。 - 非侵入式:不修改标准库,不污染业务代码,只需
import即可生效。
3. 优先级路由与核心用法速览
为了避免不同格式化方式之间的冲突,本组件在编译期规定了严格的优先级路由。以下是 5 种分类的详细用法:
优先级 接口约定 适用场景与说明 1 format_as(v)最佳实践。继承底层类型的格式规范。 2 v.to_string()常规业务输出。返回 std::string。3 v.to_repr()调试/诊断输出。返回结构化语义表示。 4 enum/enum class自动转换为只读的枚举名字符串。 5 operator<<(ostream)兜底方案。捕获传统流输出。 3.1 方式一:基于
format_as的无缝转发(
️ 推荐)特性:返回整数或其他基础类型时,格式规范(如
{:#x}/{:08d}等)将被完整透传。struct Flags { int bits; }; // 自由函数,通过 ADL 查找 auto format_as(const Flags& f) { return f.bits; } std::println("{:#010x}", Flags{255}); // 输出: 0x000000ff3.2 方式二:基于
to_string的常规输出特性:不再需要手动加
.to_string(),组件会自动探测并调用。适用于需要将对象状态转化为人类可读字符串的常规业务场景。struct Version { int major, minor; auto to_string() const -> std::string { return std::format("{}.{}", major, minor); } }; std::println("{}", Version{1, 2}); // 输出: 1.23.3 方式三:基于
to_repr的诊断输出特性:语义上专用于 Debug 打印,输出包含类型元数据的结构化信息。
struct Node { int id; auto to_repr() const -> std::string { return std::format("Node(id={})", id); } }; std::println("{}", Node{42}); // 输出: Node(id=42)3.4 方式四:枚举类型的自动反射
特性:彻底告别输出枚举整数值的痛苦,自动打印枚举项名称。
enum class ScopedState { idle, running }; std::println("{}", ScopedState::running); // 输出: running3.5 方式五:兼容遗留
operator<<特性:作为最后的兜底方案,让老旧代码无需任何改动即可接入
std::format体系。struct Legacy { int value; friend auto operator<<(std::ostream& os, const Legacy& v) -> std::ostream& { return os << "legacy:" << v.value; } }; std::println("{}", Legacy{7}); // 输出: legacy:7
4. 揭秘底层机制:编译期分派与“上下文陷阱”
本组件的核心魔法在于编译期 SFINAE 的现代化平替——C++20 Concepts。
首先,定义一组 Concept 来嗅探类型的能力:
import std; template<typename T> concept has_format_as = requires(const T& t) { format_as(t); }; template<typename T> concept has_to_string = requires(const T& t) { t.to_string(); }; template<typename T> concept has_to_repr = requires(const T& t) { t.to_repr(); }; template<typename T> concept has_ostream = requires (const T& t, std::ostream& os) { os << t; };接着,利用约束对
std::formatter<T>进行特化,实现按优先级的路由。
深水区踩坑预警:为什么 format的参数必须是auto& ctx?在早期的编译器中,我们习惯将参数写成
std::format_context& ctx。但在现代严苛的标准库实现(如 LLVM/libc++ 18)中,这会导致编译期报错:non-constexpr function cannot be used in a constant expression。根本原因:C++20 规定
std::format会在编译期对格式化字符串进行语法校验。libc++ 在编译期假跑校验时,传入的根本不是运行期的std::format_context,而是一个专门的虚拟类型std::__format::__compile_time_basic_format_context!如果硬编码std::format_context&,会导致编译期的虚拟上下文类型无法匹配,编译器就会认为你的类型“不可被格式化”。破局之道:利用 C++20 的简写模板语法,将参数泛化为
auto& ctx。这既满足了模板签名的强制要求,又保持了泛型接口的纯粹性与零开销。以
to_string为例,标准的特化写法如下:// 优先级 2:拦截具有 to_string 的类型 template <typename T> requires (!xin::has_format_as<T>) && xin::has_to_string<T> struct std::formatter<T>: std::formatter<std::string> { // ⚠️ 注意:必须使用 auto& ctx 适配编译期与运行期上下文 auto format(const T& value, auto& ctx) const { return std::formatter<std::string>::format(value.to_string(), ctx); } };
️ 细节预警:在处理 operator<<兜底时,为了避免与标准库自带特化的类型(如std::string)发生重定义冲突,必须增加一个用户自定义类型(User-Defined Types)的拦截器:template<typename T> concept user_defined_type = std::is_class_v<std::remove_cvref_t<T>> || std::is_union_v<std::remove_cvref_t<T>>; // 排除内置类型 // 优先级 5:兜底 operator<< template<typename T> requires (!xin::has_format_as<T>) && (!xin::has_to_string<T>) && (!xin::has_to_repr<T>) && xin::user_defined_type<T> // 核心拦截器 && xin::has_ostream<T> struct std::formatter<T>: std::formatter<std::string> { auto format(const T& value, auto& ctx) const { std::ostringstream os; os << value; return std::formatter<std::string>::format(os.str(), ctx); } };
5. 性能考量 (Zero-Cost Abstraction)
由于分派机制完全建立在 C++20 Concepts 上,这层抽象在运行期是零成本(Zero-Cost)的。最终的性能仅取决于你选择的实现路径:
- 极致性能:使用
format_as转发给基础类型,与原生std::format无异。 - 中等开销:使用
to_string/to_repr,存在std::string构造时的动态内存分配。 - 最高损耗:使用
operator<<,涉及std::ostringstream的构造与格式化,建议仅作为过渡方案。
6. 遗憾与未来展望
目前的一个小缺憾在于,受限于
std::format解析上下文的复杂性,我们无法像 Python 那样通过语法糖(如{user!r})动态强制走__repr__路径。目前to_string和to_repr仍是一种严格的回退关系。期待未来标准库开放更灵活的扩展能力。
完整资源指路:- 源码实现:
src/common/format.cppm - 详细文档:
docs/common/format.md
只需在项目中
import xin.format;,即可享受这一切。 - 满天飞的样板代码:
-
如果创建一个新的社区版块?@sunrisepeak
申请人用户名: Doomjustin
版块归属: Blogs | 博客
版块名: xin
版块简介: 个人博客,随想随写