【 基于 io_uring 的 C++20 协程网络库】09 读路径优化:recv_multishot与ReceiveStream
-
在上一篇里,我们解决了"怎么把数据发出去"。这一篇转到读路径:把
recv_multishot接进来,减少高频读取时的重复提交。如果沿用
async_read_some,每次读取都要先准备一块可写 buffer,再提交一次recv,等 CQE 回来后再决定要不要继续下一次。这个模型本身没错,但放到 Echo、代理、网关这类长连接里,会很快变成重复劳动:准备 buffer、提交 SQE、等待 CQE、再补上下一次读取。recv_multishot能直接解决这件事:一次提交,对应后续多次完成。在开始之前,先确认一下当前库的状态。前几篇一路写下来,
IOContext一直是个比较扁平的类:持有一个io_uringring 句柄,提供 SQE 获取和事件循环驱动,加上 work 计数和 stop 信号:class IOContext { ::io_uring ring_; int event_fd_; std::size_t outstanding_works_{ 0 }; std::atomic<bool> should_stop_{ false }; void run(); auto sqe() -> ::io_uring_sqe*; void wakeup(); // CQE 分发 ... };socket 层的每个异步操作,把自己包装成
Operation提交进 ring,CQE 回来时由IOContext分发回去。至于 buffer,一直是由调用方临时传入,操作完成后调用方自己处理生命周期。这个结构目前是干净的,但
recv_multishot要求的东西比这多一层。
1. 直接加进去会碰到什么
recv_multishot需要 buffer ring:内核不再接受调用方临时传入的单块 buffer,而是要求预先注册一组固定大小的槽位。每次有数据到达,内核从 ring 里借一个槽位写入,CQE 里带上这次借出的槽位编号,调用方读完数据后再把槽位还回去。最直接的想法是往
IOContext里加几个字段:class IOContext { ::io_uring ring_; int event_fd_; std::size_t outstanding_works_{ 0 }; std::atomic<bool> should_stop_{ false }; // 新加的 buffer ring 管理 std::unordered_map<unsigned, BufferRing> buffer_rings_; unsigned next_bgid_{ 0 }; std::optional<unsigned> default_bgid_; };功能上能跑,但麻烦随之而来。
第一,
IOContext现在要同时处理两类完全不同的事情:一类是"这一次 submit_and_wait 怎么跑、CQE 怎么分发",另一类是"有哪些 buffer 组被注册在内核里、槽位怎么借出和归还"。这两件事没有天然的耦合关系,混在一起只会让两侧的逻辑都难以单独修改。第二,更直接的问题出在 CQE 分发上。之前"一个 CQE 对应一次操作完成"的判断,在 multishot 里不再成立——事件循环必须识别
IORING_CQE_F_MORE,在这个 flag 还存在时不能把这笔 work 从计数里扣掉。这不是细节调整,而是分发逻辑本身语义的变化。如果这块逻辑和 buffer ring 管理搅在同一段代码里,两边会互相干扰。第三,socket 层如果要用 buffer ring,就必须拿到
bgid和bid,然后到处传。调用方写业务代码时不该感知这些细节,但没有合适的封装层,这些细节就只能往上漏。所以在写 multishot awaiter 之前,要先把
IOContext的职责拆开:- 和 ring 驱动、唤醒、CQE 分发有关的逻辑,收进独立的
Scheduler。 - buffer ring 的建立、槽位借出和归还,收进独立的
BufferRingGroup。 IOContext自己只保留对外协调接口和 work tracking。- socket 层只暴露消费接口,不让业务层直接碰 buffer slot。
这不是为了"代码好看",而是为了把复杂度放回正确位置。不做这一步,后面无论接收流、buffer 池复用还是取消清理,都会越来越难理顺。
2. 重构后的
IOContext拆分后的
IOContext把两类职责分别交给两个内部类:class IOContext { private: class Scheduler { // ring 初始化、SQE 获取、submit_and_wait、wakeup }; class BufferRingGroup { // setup buffer ring、release buffer slot、default bgid }; Scheduler scheduler_; BufferRingGroup buffers_; std::size_t outstanding_works_{ 0 }; std::atomic<bool> should_stop_{ false }; };Scheduler和BufferRingGroup的职责本来就不是一回事。Scheduler关注的是"这一次事件循环怎么跑":- 初始化和销毁
io_uringring。 - 提供 SQE。
submit_and_wait后遍历 CQE,分发给对应Operation。- 通过
eventfd做跨线程 stop 唤醒。
尤其是第 3 点,在引入 multishot 之后已经不能再按"一个 CQE 对应一个完整操作"来理解了。
Scheduler::schedule()现在必须识别IORING_CQE_F_MORE,只有在 multishot 真正结束时才能把这笔 work 从计数里扣掉。这意味着recv_multishot不只是给 socket 层加了个新能力,它也顺手改写了事件循环对"完成"这件事的理解。而
BufferRingGroup关心的是另一件事:"有哪些 buffer 组被长期注册在内核里,以及它们怎么被重复借出和归还"。这部分如果混进Scheduler,后者就会一边跑 CQE 批调度,一边操心 buffer 池资源生命周期,边界很快就会糊掉。接口本身也能看出这种分层:
auto setup_buffer_ring(unsigned entries, unsigned size) -> unsigned { return buffers_.setup(scheduler_.ring(), entries, size); } void release_buffer_ring(unsigned bgid, unsigned bid) { buffers_.release(bgid, bid); }IOContext在这里做的事情很克制:buffer ring 的建立确实需要底层 ring 句柄,但建立完成之后,后续使用者不必再感知这些细节。有了这层结构,
ReceiveStream才真正有了落脚点。否则它一边要操心 multishot 请求,一边还得自己解决 buffer group 的分配和归还,那就不是 socket 接口该承担的事情。它在
StreamSocket上的入口很轻:auto receive_stream() -> ReceiveStream<Context> { auto default_bgid = this->context().default_buffer(); if (!default_bgid) throw std::runtime_error{ "No default buffer ring available for receive stream" }; return ReceiveStream<Context>{ this->context(), this->native_handle(), *default_bgid }; }ReceiveStream不是独立工作的,它默认依赖IOContext里已经准备好的 buffer ring。在当前实现里,第一次setup_buffer_ring()创建出来的组会自动成为默认组,所以 demo 里只做一次初始化就够了。
3. 重构落地:把 buffer ring 收进
IOContextbuffer ring 进入
IOContext之后,下一步才轮到它自己的实现。BufferRingGroup::setup()做了三件事:分配一整块连续内存、向内核注册一个 buf ring、把每个 slot 预先填进 ring。auto IOContext::BufferRingGroup::setup(::io_uring* ring, unsigned entries, unsigned size) -> unsigned { auto bgid = next_bgid_++; auto& buffer_ring = group_[bgid]; buffer_ring.size = size; buffer_ring.entries = entries; buffer_ring.mask = ::io_uring_buf_ring_mask(entries); const auto alloc_size = static_cast<std::size_t>(entries * size); buffer_ring.base_address = memory_resource_->allocate(alloc_size, ALIGNMENT); int res = 0; buffer_ring.buffer = ::io_uring_setup_buf_ring(ring, entries, bgid, 0, &res); auto* base = static_cast<std::byte*>(buffer_ring.base_address); for (unsigned i = 0; i < entries; ++i) ::io_uring_buf_ring_add(buffer_ring.buffer, base + i * size, size, i, buffer_ring.mask, i); ::io_uring_buf_ring_advance(buffer_ring.buffer, entries); buffer_ring.tail = entries; if (!default_buffer_bgid_) default_buffer_bgid_ = bgid; return bgid; }这部分有三个会直接影响行为的选择。
第一,所有 buffer slot 放在一整块连续内存里,而不是单独
new一堆小块。这么做不是图省事,而是因为 buf ring 的典型使用方式本来就是"固定大小、重复借用、按槽位编号回收"。连续布局让bid -> 地址的映射退化成简单的指针偏移,后面 CQE 回来时只要base + bid * size就能找回数据。第二,
bgid的管理被封装在BufferRingGroup内部。调用方只拿到一个逻辑编号,不直接接触底层注册细节;默认组也在这里顺手建立起来,方便 socket 层提供一个无参的receive_stream()。第三,归还路径同样应该放在这里,而不是散在各处调用
io_uring_buf_ring_add。因为释放 slot 本质上是 buffer ring 自己的状态变更,和具体是哪条连接、哪次读操作用过这个 slot 没关系。归还逻辑如下:
void IOContext::BufferRingGroup::release(unsigned bgid, unsigned bid) { auto& buffer_ring = group_[bgid]; auto* base = static_cast<std::byte*>(buffer_ring.base_address); const int offset = buffer_ring.tail & buffer_ring.mask; ::io_uring_buf_ring_add( buffer_ring.buffer, base + bid * buffer_ring.size, buffer_ring.size, bid, buffer_ring.mask, offset ); ::io_uring_buf_ring_advance(buffer_ring.buffer, 1); ++buffer_ring.tail; }一个 slot 被借走时,
bid会随 CQE 一起回来;一个 slot 被归还时,BufferRingGroup只需要根据同样的bid把它重新挂回 ring。这样 buffer 的借出和回收就闭上了环,之后PooledBuffer才有可能用 RAII 去包装它。
4. 地基理顺后,再把 multishot 接上
ReceiveStream真正的提交动作在arm_operation()里:void arm_operation() { if (!operation_) operation_ = new MutishotReceiveOperation{ this, context_, bgid_ }; auto* sqe = context_->sqe(); ::io_uring_prep_recv_multishot(sqe, fd_, nullptr, 0, 0); sqe->flags |= IOSQE_BUFFER_SELECT; sqe->buf_group = bgid_; ::io_uring_sqe_set_data(sqe, static_cast<Operation*>(operation_)); operation_armed_ = true; }这个提交动作有两个关键信息。
第一,
io_uring_prep_recv_multishot只提交一次,但不是只收一次。只要内核认为这个 multishot 请求还有效,后面每次有数据到达,它都可以继续产出新的 CQE。第二,
IOSQE_BUFFER_SELECT告诉内核:接收缓冲区不由这次 SQE 显式给出,而是从buf_group对应的 buffer ring 里挑一个空闲槽位来写。用户态这次不传void* buf,而是预先把一组固定大小的 buffer 注册给内核,后面由内核按需借用。CQE 回来时,
res是本次收到的字节数,flags里则可能带上两个额外信息:IORING_CQE_F_BUFFER:说明这次结果对应某个 buffer ring 里的槽位。IORING_CQE_F_MORE:说明这个 multishot 请求还活着,后面还会继续收。
这两个 flag 基本就是
ReceiveStream最关心的信息:一个告诉它"数据落在哪块 buffer 上",一个告诉它"这条流还要不要继续等下去"。
5. 最后才是用户侧 API:为什么返回
PooledBuffer如果只是为了把数据交给调用方,返回
std::span<std::byte>看起来已经够了。但对ReceiveStream来说,这远远不够,因为 span 只是一段视图,不携带任何归还语义。内核从 buffer ring 里借出的槽位,最终必须还回去,否则 ring 只会越用越少,直到耗尽。这个归还动作不能指望业务层记得手工调用,所以这里必须做成 RAII。
PooledBuffer的职责正是这个:template<typename Context> class PooledBuffer { public: PooledBuffer(Context& context, unsigned bgid, unsigned bid, std::span<std::byte> buffer) : context_{ &context }, bgid_{ bgid }, bid_{ bid }, buffer_{ buffer } {} ~PooledBuffer() { release(); } auto data() const noexcept -> std::span<std::byte> { return buffer_; } private: void release() { if (valid()) context_->release_buffer_ring(bgid_, bid_); } };调用方拿到的是一块"带归还能力的 buffer 句柄"。它可以读这段数据,也可以把这段数据直接交给后续写操作;一旦这个对象离开作用域,对应槽位就自动回到 buffer ring,可供下一次接收复用。
这也是
ReceiveStream最重要的体验差异:上层拿到的是一块已经装好、生命周期清晰、离开作用域后能自动回收的结果,而不是一段裸内存视图。
6.
next()为什么需要 ready queueReceiveStream对外只暴露一个动作:auto next() -> NextAwaiter { return NextAwaiter{ *this }; }next()能不能用得顺,关键在NextAwaiter和ready_results_的配合。class NextAwaiter { public: auto await_ready() const noexcept -> bool { return !stream_.ready_results_.empty(); } void await_suspend(std::coroutine_handle<> handle) noexcept { stream_.handle_ = handle; if (!stream_.operation_armed_) stream_.arm_operation(); } auto await_resume() -> std::expected<PooledBuffer<Context>, std::error_code> { if (stream_.ready_results_.empty()) return unexpected_system_error(std::errc::operation_canceled); auto result = std::move(stream_.ready_results_.front()); stream_.ready_results_.pop_front(); return result; } };这里的
ready_results_不是装饰品,而是必须存在的一层缓冲。recv_multishot有一个天然特征:协程还没来得及再次co_await next(),它就可能已经连续产出了多个 CQE。如果没有队列,后到的结果只能覆盖先到的结果,或者逼着handle_cqe()当场恢复协程并同步消化所有数据,这两种都不对。把这段行为画成时序会直观很多:
[Kernel multishot] [ReceiveStream] [User Coroutine] | | | |-- CQE #1 ---------------->| push ready_results_ | |-- CQE #2 ---------------->| push ready_results_ | |-- CQE #3 ---------------->| push ready_results_ | | | | | |<----------- co_await next() ---| | | pop #1 and resume | | |<----------- co_await next() ---| | | pop #2 and resume |也就是说,
ready_results_不是优化项,而是recv_multishot能以“生产者-消费者节奏解耦”方式暴露给上层 API 的必要条件。用
std::deque<result_type>把结果先存起来,问题就顺了:- CQE 到达时,先转成
PooledBuffer或 error,推进队列。 - 如果此刻真的有协程挂在
next()上,就恢复它。 - 如果协程暂时还没来取,结果就老实待在队列里,等下一次
await_ready()直接命中。
这样一来,
ReceiveStream就有了一点异步生成器的感觉:底层持续产出,上层按自己的节奏消费,中间靠一个轻量队列解耦。
7. CQE 到达后如何变成可消费结果
handle_cqe()做的事情,说到底就是把内核语义翻译成库语义。void handle_cqe(int result, std::uint32_t flags) noexcept { std::optional<PooledBuffer<Context>> pooled_buffer; if (flags & IORING_CQE_F_BUFFER) { const auto bid = static_cast<std::uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT); auto& buffer_ring = context_->buffer_ring(bgid_); auto* base = static_cast<std::byte*>(buffer_ring.base_address); std::span<std::byte> data{ base + bid * buffer_ring.size, static_cast<std::size_t>(result) }; pooled_buffer.emplace(*context_, bgid_, bid, data); } if (result == -ECANCELED) { ready_results_.emplace_back(unexpected_system_error(std::errc::operation_canceled)); } else if (result >= 0) { if (pooled_buffer) ready_results_.emplace_back(std::move(*pooled_buffer)); else ready_results_.emplace_back(PooledBuffer<Context>{}); } else { ready_results_.emplace_back(unexpected_system_error(-result)); } if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } }这一步分三层:
第一层,从
flags里提取bid,再结合bgid_找回 buffer ring,算出这次数据实际落在了哪一段内存上。第二层,把这段内存包装成
PooledBuffer。从这一刻开始,buffer 的归还责任被显式绑定到了对象生命周期上。第三层,把内核返回码整理成
std::expected:正常数据走 value,取消和错误走 error。这样上层的消费方式就统一了。还有一个细节:
result >= 0但没有IORING_CQE_F_BUFFER时,代码会塞一个默认构造的PooledBuffer。这给上层留出了处理空结果的空间,比如把空 buffer 视为连接关闭。这个约定在 demo 里也直接用到了。
8. 真正难的是收尾
ReceiveStream最容易写错的,不是提交 multishot,而是收尾。问题在于:当
ReceiveStream析构时,内核里那笔 multishot 请求不一定已经结束。你当然可以提交一个 cancel SQE,但 cancel 也是异步的,在 cancel 的 CQE 真正回来之前,原来的 multishot CQE 仍然可能再到一次。这个窗口期一旦处理不好,要么 use-after-free,要么 buffer 泄漏。解决办法是:析构时不直接删 operation,而是先
detach()。void destroy() noexcept { if (operation_) { operation_->detach(); auto* sqe = context_->sqe(false); ::io_uring_prep_cancel(sqe, static_cast<Operation*>(operation_), 0); ::io_uring_sqe_set_data(sqe, nullptr); operation_ = nullptr; } if (context_) context_ = nullptr; }detach()的效果是把operation和ReceiveStream本体断开。之后如果晚到的 CQE 还落到这笔 operation 上,complete()会走另一条路径:不再访问已经析构的 stream,而是仅仅把 buffer 槽位补回 buffer ring。void complete(int res, unsigned flags) override { const bool has_more = (flags & IORING_CQE_F_MORE) != 0; if (stream_) { if (!has_more) { stream_->operation_armed_ = false; stream_->operation_ = nullptr; } stream_->handle_cqe(res, flags); } else if (flags & IORING_CQE_F_BUFFER) { // stream 已不存在,只负责把槽位补回 buffer ring // ... } if (!has_more) delete this; }这样一来,
ReceiveStream的析构和 multishot 请求的最终死亡就不再需要严格同步,晚到的 CQE 也不会污染用户态状态,只会做最必要的资源回收。另外,move 构造和 move 赋值里也有一个对应的修正动作:如果 operation 还活着,就把
operation_->stream_改指向新的对象。这保证了ReceiveStream被移动后,后续 CQE 仍然能回到正确实例上。
9. 实际用法
在 demo 里,Echo Server 的 session 代码已经很干净了:
auto session(ip::tcp::socket<IOContext> client) -> Task<> { auto stream = client.receive_stream(); while (true) { auto read_result = co_await stream.next(); if (!read_result) { spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } auto received = read_result->data(); if (received.empty()) { spdlog::info("Client {} disconnected", client.native_handle()); co_return; } auto write_result = co_await timeout(async_write(client, received), 1ms); if (!write_result) co_return; } }这段代码最直观的变化是:读路径里已经看不到"准备 buffer -> 提交 recv -> 处理 buffer 生命周期"这些样板动作了。业务协程眼里只剩一件事:下一块数据什么时候到。
这正是
ReceiveStream的价值。它不是单纯给recv_multishot套了个 awaiter,而是顺手把 buffer ring、生命周期、结果排队和析构期收尾这些麻烦一起包掉了。上层得到的是一个可以连续next()的接收流,底层保留的则是 io_uring 多次投递的吞吐优势。 - 和 ring 驱动、唤醒、CQE 分发有关的逻辑,收进独立的