跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 【 基于 io_uring 的 C++20 协程网络库】09 读路径优化:recv_multishot与ReceiveStream

【 基于 io_uring 的 C++20 协程网络库】09 读路径优化:recv_multishot与ReceiveStream

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 4 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 在线
    DoomjustinD 在线
    Doomjustin
    编写于 最后由 编辑
    #1

    在上一篇里,我们解决了"怎么把数据发出去"。这一篇转到读路径:把 recv_multishot 接进来,减少高频读取时的重复提交。

    如果沿用 async_read_some,每次读取都要先准备一块可写 buffer,再提交一次 recv,等 CQE 回来后再决定要不要继续下一次。这个模型本身没错,但放到 Echo、代理、网关这类长连接里,会很快变成重复劳动:准备 buffer、提交 SQE、等待 CQE、再补上下一次读取。

    recv_multishot 能直接解决这件事:一次提交,对应后续多次完成。

    在开始之前,先确认一下当前库的状态。前几篇一路写下来,IOContext 一直是个比较扁平的类:持有一个 io_uring ring 句柄,提供 SQE 获取和事件循环驱动,加上 work 计数和 stop 信号:

    class IOContext {
        ::io_uring ring_;
        int event_fd_;
        std::size_t outstanding_works_{ 0 };
        std::atomic<bool> should_stop_{ false };
    
        void run();
        auto sqe() -> ::io_uring_sqe*;
        void wakeup();
        // CQE 分发 ...
    };
    

    socket 层的每个异步操作,把自己包装成 Operation 提交进 ring,CQE 回来时由 IOContext 分发回去。至于 buffer,一直是由调用方临时传入,操作完成后调用方自己处理生命周期。

    这个结构目前是干净的,但 recv_multishot 要求的东西比这多一层。


    1. 直接加进去会碰到什么

    recv_multishot 需要 buffer ring:内核不再接受调用方临时传入的单块 buffer,而是要求预先注册一组固定大小的槽位。每次有数据到达,内核从 ring 里借一个槽位写入,CQE 里带上这次借出的槽位编号,调用方读完数据后再把槽位还回去。

    最直接的想法是往 IOContext 里加几个字段:

    class IOContext {
        ::io_uring ring_;
        int event_fd_;
        std::size_t outstanding_works_{ 0 };
        std::atomic<bool> should_stop_{ false };
    
        // 新加的 buffer ring 管理
        std::unordered_map<unsigned, BufferRing> buffer_rings_;
        unsigned next_bgid_{ 0 };
        std::optional<unsigned> default_bgid_;
    };
    

    功能上能跑,但麻烦随之而来。

    第一,IOContext 现在要同时处理两类完全不同的事情:一类是"这一次 submit_and_wait 怎么跑、CQE 怎么分发",另一类是"有哪些 buffer 组被注册在内核里、槽位怎么借出和归还"。这两件事没有天然的耦合关系,混在一起只会让两侧的逻辑都难以单独修改。

    第二,更直接的问题出在 CQE 分发上。之前"一个 CQE 对应一次操作完成"的判断,在 multishot 里不再成立——事件循环必须识别 IORING_CQE_F_MORE,在这个 flag 还存在时不能把这笔 work 从计数里扣掉。这不是细节调整,而是分发逻辑本身语义的变化。如果这块逻辑和 buffer ring 管理搅在同一段代码里,两边会互相干扰。

    第三,socket 层如果要用 buffer ring,就必须拿到 bgid 和 bid,然后到处传。调用方写业务代码时不该感知这些细节,但没有合适的封装层,这些细节就只能往上漏。

    所以在写 multishot awaiter 之前,要先把 IOContext 的职责拆开:

    1. 和 ring 驱动、唤醒、CQE 分发有关的逻辑,收进独立的 Scheduler。
    2. buffer ring 的建立、槽位借出和归还,收进独立的 BufferRingGroup。
    3. IOContext 自己只保留对外协调接口和 work tracking。
    4. socket 层只暴露消费接口,不让业务层直接碰 buffer slot。

    这不是为了"代码好看",而是为了把复杂度放回正确位置。不做这一步,后面无论接收流、buffer 池复用还是取消清理,都会越来越难理顺。


    2. 重构后的 IOContext

    拆分后的 IOContext 把两类职责分别交给两个内部类:

    class IOContext {
    private:
        class Scheduler {
            // ring 初始化、SQE 获取、submit_and_wait、wakeup
        };
    
        class BufferRingGroup {
            // setup buffer ring、release buffer slot、default bgid
        };
    
        Scheduler scheduler_;
        BufferRingGroup buffers_;
        std::size_t outstanding_works_{ 0 };
        std::atomic<bool> should_stop_{ false };
    };
    

    Scheduler 和 BufferRingGroup 的职责本来就不是一回事。

    Scheduler 关注的是"这一次事件循环怎么跑":

    1. 初始化和销毁 io_uring ring。
    2. 提供 SQE。
    3. submit_and_wait 后遍历 CQE,分发给对应 Operation。
    4. 通过 eventfd 做跨线程 stop 唤醒。

    尤其是第 3 点,在引入 multishot 之后已经不能再按"一个 CQE 对应一个完整操作"来理解了。Scheduler::schedule() 现在必须识别 IORING_CQE_F_MORE,只有在 multishot 真正结束时才能把这笔 work 从计数里扣掉。这意味着 recv_multishot 不只是给 socket 层加了个新能力,它也顺手改写了事件循环对"完成"这件事的理解。

    而 BufferRingGroup 关心的是另一件事:"有哪些 buffer 组被长期注册在内核里,以及它们怎么被重复借出和归还"。这部分如果混进 Scheduler,后者就会一边跑 CQE 批调度,一边操心 buffer 池资源生命周期,边界很快就会糊掉。

    接口本身也能看出这种分层:

    auto setup_buffer_ring(unsigned entries, unsigned size) -> unsigned
    {
        return buffers_.setup(scheduler_.ring(), entries, size);
    }
    
    void release_buffer_ring(unsigned bgid, unsigned bid)
    {
        buffers_.release(bgid, bid);
    }
    

    IOContext 在这里做的事情很克制:buffer ring 的建立确实需要底层 ring 句柄,但建立完成之后,后续使用者不必再感知这些细节。

    有了这层结构,ReceiveStream 才真正有了落脚点。否则它一边要操心 multishot 请求,一边还得自己解决 buffer group 的分配和归还,那就不是 socket 接口该承担的事情。

    它在 StreamSocket 上的入口很轻:

    auto receive_stream() -> ReceiveStream<Context>
    {
        auto default_bgid = this->context().default_buffer();
        if (!default_bgid)
            throw std::runtime_error{ "No default buffer ring available for receive stream" };
    
        return ReceiveStream<Context>{ this->context(), this->native_handle(), *default_bgid };
    }
    

    ReceiveStream 不是独立工作的,它默认依赖 IOContext 里已经准备好的 buffer ring。在当前实现里,第一次 setup_buffer_ring() 创建出来的组会自动成为默认组,所以 demo 里只做一次初始化就够了。


    3. 重构落地:把 buffer ring 收进 IOContext

    buffer ring 进入 IOContext 之后,下一步才轮到它自己的实现。

    BufferRingGroup::setup() 做了三件事:分配一整块连续内存、向内核注册一个 buf ring、把每个 slot 预先填进 ring。

    auto IOContext::BufferRingGroup::setup(::io_uring* ring, unsigned entries, unsigned size) -> unsigned
    {
        auto bgid = next_bgid_++;
        auto& buffer_ring = group_[bgid];
    
        buffer_ring.size = size;
        buffer_ring.entries = entries;
        buffer_ring.mask = ::io_uring_buf_ring_mask(entries);
    
        const auto alloc_size = static_cast<std::size_t>(entries * size);
        buffer_ring.base_address = memory_resource_->allocate(alloc_size, ALIGNMENT);
    
        int res = 0;
        buffer_ring.buffer = ::io_uring_setup_buf_ring(ring, entries, bgid, 0, &res);
    
        auto* base = static_cast<std::byte*>(buffer_ring.base_address);
        for (unsigned i = 0; i < entries; ++i)
            ::io_uring_buf_ring_add(buffer_ring.buffer, base + i * size, size, i, buffer_ring.mask, i);
    
        ::io_uring_buf_ring_advance(buffer_ring.buffer, entries);
        buffer_ring.tail = entries;
    
        if (!default_buffer_bgid_)
            default_buffer_bgid_ = bgid;
    
        return bgid;
    }
    

    这部分有三个会直接影响行为的选择。

    第一,所有 buffer slot 放在一整块连续内存里,而不是单独 new 一堆小块。这么做不是图省事,而是因为 buf ring 的典型使用方式本来就是"固定大小、重复借用、按槽位编号回收"。连续布局让 bid -> 地址 的映射退化成简单的指针偏移,后面 CQE 回来时只要 base + bid * size 就能找回数据。

    第二,bgid 的管理被封装在 BufferRingGroup 内部。调用方只拿到一个逻辑编号,不直接接触底层注册细节;默认组也在这里顺手建立起来,方便 socket 层提供一个无参的 receive_stream()。

    第三,归还路径同样应该放在这里,而不是散在各处调用 io_uring_buf_ring_add。因为释放 slot 本质上是 buffer ring 自己的状态变更,和具体是哪条连接、哪次读操作用过这个 slot 没关系。

    归还逻辑如下:

    void IOContext::BufferRingGroup::release(unsigned bgid, unsigned bid)
    {
        auto& buffer_ring = group_[bgid];
        auto* base = static_cast<std::byte*>(buffer_ring.base_address);
        const int offset = buffer_ring.tail & buffer_ring.mask;
    
        ::io_uring_buf_ring_add(
            buffer_ring.buffer,
            base + bid * buffer_ring.size,
            buffer_ring.size,
            bid,
            buffer_ring.mask,
            offset
        );
    
        ::io_uring_buf_ring_advance(buffer_ring.buffer, 1);
        ++buffer_ring.tail;
    }
    

    一个 slot 被借走时,bid 会随 CQE 一起回来;一个 slot 被归还时,BufferRingGroup 只需要根据同样的 bid 把它重新挂回 ring。这样 buffer 的借出和回收就闭上了环,之后 PooledBuffer 才有可能用 RAII 去包装它。


    4. 地基理顺后,再把 multishot 接上

    ReceiveStream 真正的提交动作在 arm_operation() 里:

    void arm_operation()
    {
        if (!operation_)
            operation_ = new MutishotReceiveOperation{ this, context_, bgid_ };
    
        auto* sqe = context_->sqe();
        ::io_uring_prep_recv_multishot(sqe, fd_, nullptr, 0, 0);
        sqe->flags |= IOSQE_BUFFER_SELECT;
        sqe->buf_group = bgid_;
        ::io_uring_sqe_set_data(sqe, static_cast<Operation*>(operation_));
        operation_armed_ = true;
    }
    

    这个提交动作有两个关键信息。

    第一,io_uring_prep_recv_multishot 只提交一次,但不是只收一次。只要内核认为这个 multishot 请求还有效,后面每次有数据到达,它都可以继续产出新的 CQE。

    第二,IOSQE_BUFFER_SELECT 告诉内核:接收缓冲区不由这次 SQE 显式给出,而是从 buf_group 对应的 buffer ring 里挑一个空闲槽位来写。用户态这次不传 void* buf,而是预先把一组固定大小的 buffer 注册给内核,后面由内核按需借用。

    CQE 回来时,res 是本次收到的字节数,flags 里则可能带上两个额外信息:

    1. IORING_CQE_F_BUFFER:说明这次结果对应某个 buffer ring 里的槽位。
    2. IORING_CQE_F_MORE:说明这个 multishot 请求还活着,后面还会继续收。

    这两个 flag 基本就是 ReceiveStream 最关心的信息:一个告诉它"数据落在哪块 buffer 上",一个告诉它"这条流还要不要继续等下去"。


    5. 最后才是用户侧 API:为什么返回 PooledBuffer

    如果只是为了把数据交给调用方,返回 std::span<std::byte> 看起来已经够了。但对 ReceiveStream 来说,这远远不够,因为 span 只是一段视图,不携带任何归还语义。

    内核从 buffer ring 里借出的槽位,最终必须还回去,否则 ring 只会越用越少,直到耗尽。这个归还动作不能指望业务层记得手工调用,所以这里必须做成 RAII。

    PooledBuffer 的职责正是这个:

    template<typename Context>
    class PooledBuffer {
    public:
        PooledBuffer(Context& context, unsigned bgid, unsigned bid, std::span<std::byte> buffer)
          : context_{ &context }, bgid_{ bgid }, bid_{ bid }, buffer_{ buffer }
        {}
    
        ~PooledBuffer()
        {
            release();
        }
    
        auto data() const noexcept -> std::span<std::byte>
        {
            return buffer_;
        }
    
    private:
        void release()
        {
            if (valid())
                context_->release_buffer_ring(bgid_, bid_);
        }
    };
    

    调用方拿到的是一块"带归还能力的 buffer 句柄"。它可以读这段数据,也可以把这段数据直接交给后续写操作;一旦这个对象离开作用域,对应槽位就自动回到 buffer ring,可供下一次接收复用。

    这也是 ReceiveStream 最重要的体验差异:上层拿到的是一块已经装好、生命周期清晰、离开作用域后能自动回收的结果,而不是一段裸内存视图。


    6. next() 为什么需要 ready queue

    ReceiveStream 对外只暴露一个动作:

    auto next() -> NextAwaiter
    {
        return NextAwaiter{ *this };
    }
    

    next() 能不能用得顺,关键在 NextAwaiter 和 ready_results_ 的配合。

    class NextAwaiter {
    public:
        auto await_ready() const noexcept -> bool
        {
            return !stream_.ready_results_.empty();
        }
    
        void await_suspend(std::coroutine_handle<> handle) noexcept
        {
            stream_.handle_ = handle;
    
            if (!stream_.operation_armed_)
                stream_.arm_operation();
        }
    
        auto await_resume() -> std::expected<PooledBuffer<Context>, std::error_code>
        {
            if (stream_.ready_results_.empty())
                return unexpected_system_error(std::errc::operation_canceled);
    
            auto result = std::move(stream_.ready_results_.front());
            stream_.ready_results_.pop_front();
            return result;
        }
    };
    

    这里的 ready_results_ 不是装饰品,而是必须存在的一层缓冲。

    recv_multishot 有一个天然特征:协程还没来得及再次 co_await next(),它就可能已经连续产出了多个 CQE。如果没有队列,后到的结果只能覆盖先到的结果,或者逼着 handle_cqe() 当场恢复协程并同步消化所有数据,这两种都不对。

    把这段行为画成时序会直观很多:

    [Kernel multishot]          [ReceiveStream]                 [User Coroutine]
        |                           |                                |
        |-- CQE #1 ---------------->| push ready_results_            |
        |-- CQE #2 ---------------->| push ready_results_            |
        |-- CQE #3 ---------------->| push ready_results_            |
        |                           |                                |
        |                           |<----------- co_await next() ---|
        |                           | pop #1 and resume              |
        |                           |<----------- co_await next() ---|
        |                           | pop #2 and resume              |
    

    也就是说,ready_results_ 不是优化项,而是 recv_multishot 能以“生产者-消费者节奏解耦”方式暴露给上层 API 的必要条件。

    用 std::deque<result_type> 把结果先存起来,问题就顺了:

    1. CQE 到达时,先转成 PooledBuffer 或 error,推进队列。
    2. 如果此刻真的有协程挂在 next() 上,就恢复它。
    3. 如果协程暂时还没来取,结果就老实待在队列里,等下一次 await_ready() 直接命中。

    这样一来,ReceiveStream 就有了一点异步生成器的感觉:底层持续产出,上层按自己的节奏消费,中间靠一个轻量队列解耦。


    7. CQE 到达后如何变成可消费结果

    handle_cqe() 做的事情,说到底就是把内核语义翻译成库语义。

    void handle_cqe(int result, std::uint32_t flags) noexcept
    {
        std::optional<PooledBuffer<Context>> pooled_buffer;
    
        if (flags & IORING_CQE_F_BUFFER) {
            const auto bid = static_cast<std::uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT);
            auto& buffer_ring = context_->buffer_ring(bgid_);
    
            auto* base = static_cast<std::byte*>(buffer_ring.base_address);
            std::span<std::byte> data{ base + bid * buffer_ring.size, static_cast<std::size_t>(result) };
            pooled_buffer.emplace(*context_, bgid_, bid, data);
        }
    
        if (result == -ECANCELED) {
            ready_results_.emplace_back(unexpected_system_error(std::errc::operation_canceled));
        }
        else if (result >= 0) {
            if (pooled_buffer)
                ready_results_.emplace_back(std::move(*pooled_buffer));
            else
                ready_results_.emplace_back(PooledBuffer<Context>{});
        }
        else {
            ready_results_.emplace_back(unexpected_system_error(-result));
        }
    
        if (handle_) {
            auto handle = std::exchange(handle_, nullptr);
            handle.resume();
        }
    }
    

    这一步分三层:

    第一层,从 flags 里提取 bid,再结合 bgid_ 找回 buffer ring,算出这次数据实际落在了哪一段内存上。

    第二层,把这段内存包装成 PooledBuffer。从这一刻开始,buffer 的归还责任被显式绑定到了对象生命周期上。

    第三层,把内核返回码整理成 std::expected:正常数据走 value,取消和错误走 error。这样上层的消费方式就统一了。

    还有一个细节:result >= 0 但没有 IORING_CQE_F_BUFFER 时,代码会塞一个默认构造的 PooledBuffer。这给上层留出了处理空结果的空间,比如把空 buffer 视为连接关闭。这个约定在 demo 里也直接用到了。


    8. 真正难的是收尾

    ReceiveStream 最容易写错的,不是提交 multishot,而是收尾。

    问题在于:当 ReceiveStream 析构时,内核里那笔 multishot 请求不一定已经结束。你当然可以提交一个 cancel SQE,但 cancel 也是异步的,在 cancel 的 CQE 真正回来之前,原来的 multishot CQE 仍然可能再到一次。这个窗口期一旦处理不好,要么 use-after-free,要么 buffer 泄漏。

    解决办法是:析构时不直接删 operation,而是先 detach()。

    void destroy() noexcept
    {
        if (operation_) {
            operation_->detach();
    
            auto* sqe = context_->sqe(false);
            ::io_uring_prep_cancel(sqe, static_cast<Operation*>(operation_), 0);
            ::io_uring_sqe_set_data(sqe, nullptr);
    
            operation_ = nullptr;
        }
    
        if (context_)
            context_ = nullptr;
    }
    

    detach() 的效果是把 operation 和 ReceiveStream 本体断开。之后如果晚到的 CQE 还落到这笔 operation 上,complete() 会走另一条路径:不再访问已经析构的 stream,而是仅仅把 buffer 槽位补回 buffer ring。

    void complete(int res, unsigned flags) override
    {
        const bool has_more = (flags & IORING_CQE_F_MORE) != 0;
    
        if (stream_) {
            if (!has_more) {
                stream_->operation_armed_ = false;
                stream_->operation_ = nullptr;
            }
    
            stream_->handle_cqe(res, flags);
        }
        else if (flags & IORING_CQE_F_BUFFER) {
            // stream 已不存在,只负责把槽位补回 buffer ring
            // ...
        }
    
        if (!has_more)
            delete this;
    }
    

    这样一来,ReceiveStream 的析构和 multishot 请求的最终死亡就不再需要严格同步,晚到的 CQE 也不会污染用户态状态,只会做最必要的资源回收。

    另外,move 构造和 move 赋值里也有一个对应的修正动作:如果 operation 还活着,就把 operation_->stream_ 改指向新的对象。这保证了 ReceiveStream 被移动后,后续 CQE 仍然能回到正确实例上。


    9. 实际用法

    在 demo 里,Echo Server 的 session 代码已经很干净了:

    auto session(ip::tcp::socket<IOContext> client) -> Task<>
    {
        auto stream = client.receive_stream();
    
        while (true) {
            auto read_result = co_await stream.next();
            if (!read_result) {
                spdlog::warn("Failed to read from client {}: {}",
                             client.native_handle(),
                             read_result.error().message());
                co_return;
            }
    
            auto received = read_result->data();
            if (received.empty()) {
                spdlog::info("Client {} disconnected", client.native_handle());
                co_return;
            }
    
            auto write_result = co_await timeout(async_write(client, received), 1ms);
            if (!write_result)
                co_return;
        }
    }
    

    这段代码最直观的变化是:读路径里已经看不到"准备 buffer -> 提交 recv -> 处理 buffer 生命周期"这些样板动作了。业务协程眼里只剩一件事:下一块数据什么时候到。

    这正是 ReceiveStream 的价值。它不是单纯给 recv_multishot 套了个 awaiter,而是顺手把 buffer ring、生命周期、结果排队和析构期收尾这些麻烦一起包掉了。上层得到的是一个可以连续 next() 的接收流,底层保留的则是 io_uring 多次投递的吞吐优势。

    1 条回复 最后回复
    1

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组