跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

DoomjustinD

Doomjustin

@Doomjustin
关于
帖子
17
主题
15
群组
0
粉丝
0
关注
0

帖子

最新 最佳 有争议的

  • std::format增强组件
    DoomjustinD Doomjustin

    优雅与效率并存:基于 C++20 Concepts 构建非侵入式 std::format 扩展

    1. 背景与痛点:std::format 很好,但还能更好

    在 C++20 引入 std::format 之前,我一直使用 fmt::format 作为格式化输出的首选。两者的 API 几乎一致,但在将其作为标准库迁移并在大型工程落地时,几个显著的痛点让人如鲠在喉:

    • 满天飞的样板代码:fmt 提供了极为便利的 format_as 机制,而现阶段的 std::format 官方仅支持通过特化 std::formatter<T> 来实现自定义输出。这意味着每次接入一个自定义类型,都必须硬着头皮手写一遍冗长且高度重复的模板样板代码,心智负担极重。
    • 手动调用的累赘感:为了逃避上述的样板代码,很多人会退而求其次,在类内提供一个 to_string() 方法。但代价是每次打印都必须显式调用(如 std::println("{}", obj.to_string())),不仅破坏了格式化字符串原有的简洁语义,写起来也极其繁琐累赘。
    • 类型输出碎片化:如果没有统一约束,工程里的输出方式就会群魔乱舞:有的依赖遗留的 operator<<,有的每次现场手写 std::format("...") 拼接内部字段,代码风格极其割裂。
    • 日志可读性劣化:尤其是枚举类型(enum),默认直接输出底层整数值。在排查问题时面对满屏的“魔术数字”,必须反复去头文件反查定义,十分痛苦。
    • 新类型接入成本高:由于缺乏统一、低成本的扩展范式,每次新增类型都要纠结“这次该怎么格式化”,稍有不慎还会与旧代码的重载产生隐式冲突。

    2. 核心设计目标

    为了彻底解决上述问题,我构建了一个轻量级的扩展组件,旨在实现以下目标:

    1. 复刻体验:提供类似 fmt::format_as 极低成本的自定义接入点,告别特化样板代码。
    2. 鸭子类型:引入 Pythonic 的约定,支持自动探测并调用类内的 to_string() 或 to_repr()。
    3. 原生枚举:借助 magic_enum,实现枚举值的直接名称输出(告别魔术数字)。
    4. 无痛兼容:对已实现 operator<< 的遗留类型提供平滑过渡。
    5. 非侵入式:不修改标准库,不污染业务代码,只需 import 即可生效。

    3. 优先级路由与核心用法速览

    为了避免不同格式化方式之间的冲突,本组件在编译期规定了严格的优先级路由。以下是 5 种分类的详细用法:

    优先级 接口约定 适用场景与说明
    1 format_as(v) 最佳实践。继承底层类型的格式规范。
    2 v.to_string() 常规业务输出。返回 std::string。
    3 v.to_repr() 调试/诊断输出。返回结构化语义表示。
    4 enum / enum class 自动转换为只读的枚举名字符串。
    5 operator<<(ostream) 兜底方案。捕获传统流输出。

    3.1 方式一:基于 format_as 的无缝转发(⭐️ 推荐)

    特性:返回整数或其他基础类型时,格式规范(如 {:#x}/{:08d} 等)将被完整透传。

    struct Flags { int bits; };
    
    // 自由函数,通过 ADL 查找
    auto format_as(const Flags& f) { return f.bits; }
    
    std::println("{:#010x}", Flags{255});  // 输出: 0x000000ff
    

    3.2 方式二:基于 to_string 的常规输出

    特性:不再需要手动加 .to_string(),组件会自动探测并调用。适用于需要将对象状态转化为人类可读字符串的常规业务场景。

    struct Version {
        int major, minor;
        auto to_string() const -> std::string {
            return std::format("{}.{}", major, minor);
        }
    };
    
    std::println("{}", Version{1, 2});  // 输出: 1.2
    

    3.3 方式三:基于 to_repr 的诊断输出

    特性:语义上专用于 Debug 打印,输出包含类型元数据的结构化信息。

    struct Node {
        int id;
        auto to_repr() const -> std::string {
            return std::format("Node(id={})", id);
        }
    };
    
    std::println("{}", Node{42});  // 输出: Node(id=42)
    

    3.4 方式四:枚举类型的自动反射

    特性:彻底告别输出枚举整数值的痛苦,自动打印枚举项名称。

    enum class ScopedState { idle, running };
    
    std::println("{}", ScopedState::running);  // 输出: running
    

    3.5 方式五:兼容遗留 operator<<

    特性:作为最后的兜底方案,让老旧代码无需任何改动即可接入 std::format 体系。

    struct Legacy {
        int value;
        friend auto operator<<(std::ostream& os, const Legacy& v) -> std::ostream& {
            return os << "legacy:" << v.value;
        }
    };
    
    std::println("{}", Legacy{7});  // 输出: legacy:7
    

    4. 揭秘底层机制:编译期分派与“上下文陷阱”

    本组件的核心魔法在于编译期 SFINAE 的现代化平替——C++20 Concepts。

    首先,定义一组 Concept 来嗅探类型的能力:

    import std;
    
    template<typename T>
    concept has_format_as = requires(const T& t) { format_as(t); };
    
    template<typename T>
    concept has_to_string = requires(const T& t) { t.to_string(); };
    
    template<typename T>
    concept has_to_repr = requires(const T& t) { t.to_repr(); };
    
    template<typename T>
    concept has_ostream = requires (const T& t, std::ostream& os) { os << t; };
    

    接着,利用约束对 std::formatter<T> 进行特化,实现按优先级的路由。

    💡 深水区踩坑预警:为什么 format 的参数必须是 auto& ctx?

    在早期的编译器中,我们习惯将参数写成 std::format_context& ctx。但在现代严苛的标准库实现(如 LLVM/libc++ 18)中,这会导致编译期报错:non-constexpr function cannot be used in a constant expression。

    根本原因:C++20 规定 std::format 会在编译期对格式化字符串进行语法校验。libc++ 在编译期假跑校验时,传入的根本不是运行期的 std::format_context,而是一个专门的虚拟类型 std::__format::__compile_time_basic_format_context!如果硬编码 std::format_context&,会导致编译期的虚拟上下文类型无法匹配,编译器就会认为你的类型“不可被格式化”。

    破局之道:利用 C++20 的简写模板语法,将参数泛化为 auto& ctx。这既满足了模板签名的强制要求,又保持了泛型接口的纯粹性与零开销。

    以 to_string 为例,标准的特化写法如下:

    // 优先级 2:拦截具有 to_string 的类型
    template <typename T>
        requires (!xin::has_format_as<T>)
              && xin::has_to_string<T>
    struct std::formatter<T>: std::formatter<std::string> {
        // ⚠️ 注意:必须使用 auto& ctx 适配编译期与运行期上下文
        auto format(const T& value, auto& ctx) const {
            return std::formatter<std::string>::format(value.to_string(), ctx);
        }
    };
    

    ⚠️ 细节预警:在处理 operator<< 兜底时,为了避免与标准库自带特化的类型(如 std::string)发生重定义冲突,必须增加一个用户自定义类型(User-Defined Types)的拦截器:

    template<typename T>
    concept user_defined_type = std::is_class_v<std::remove_cvref_t<T>>
                             || std::is_union_v<std::remove_cvref_t<T>>; // 排除内置类型
    
    // 优先级 5:兜底 operator<<
    template<typename T>
        requires (!xin::has_format_as<T>) 
              && (!xin::has_to_string<T>) 
              && (!xin::has_to_repr<T>) 
              && xin::user_defined_type<T> // 核心拦截器
              && xin::has_ostream<T>
    struct std::formatter<T>: std::formatter<std::string> {
        auto format(const T& value, auto& ctx) const {        
            std::ostringstream os;
            os << value;
            return std::formatter<std::string>::format(os.str(), ctx);
        }
    };
    

    5. 性能考量 (Zero-Cost Abstraction)

    由于分派机制完全建立在 C++20 Concepts 上,这层抽象在运行期是零成本(Zero-Cost)的。最终的性能仅取决于你选择的实现路径:

    1. 极致性能:使用 format_as 转发给基础类型,与原生 std::format 无异。
    2. 中等开销:使用 to_string / to_repr,存在 std::string 构造时的动态内存分配。
    3. 最高损耗:使用 operator<<,涉及 std::ostringstream 的构造与格式化,建议仅作为过渡方案。

    6. 遗憾与未来展望

    目前的一个小缺憾在于,受限于 std::format 解析上下文的复杂性,我们无法像 Python 那样通过语法糖(如 {user!r})动态强制走 __repr__ 路径。目前 to_string 和 to_repr 仍是一种严格的回退关系。期待未来标准库开放更灵活的扩展能力。

    🔗 完整资源指路:

    • 源码实现:src/common/format.cppm
    • 详细文档:docs/common/format.md

    只需在项目中 import xin.format;,即可享受这一切。


  • 从零构建基于 C++20 的 Task
    DoomjustinD Doomjustin

    C++20 引入了无栈协程(Stackless Coroutines)的核心语言机制,但与之相配套的标准库高级抽象(如 std::task)并未同步提供。在构建基于 io_uring 或 epoll 的高性能并发框架时,我们不可避免地需要自行设计一个用于封装异步操作的返回类型:Task<T>。

    设计这样一个任务类型,不仅仅是对新关键字的语法包装,其本质是在解决两个系统级编程的核心问题:

    1. 控制流的无缝路由
    2. 堆分配状态帧的确定性释放。

    本文将探讨如何从零构建一个可用的Task<T>。

    1. 异步组合的困境与懒启动(Lazy Evaluation)

    在传统的同步流中,函数的调用即意味着执行的开始。但在异步架构中,任务的“构造”与“执行”往往需要被严格分离。

    为了建立直观的理解,我们可以先参考 Python 中的协程行为。在 Python 中,调用一个 async def 函数并不会立即执行其内部代码,而是仅仅返回一个协程对象:

    import asyncio
    
    async def fetch_data():
        print("开始发起网络请求...")
        # ...
    
    # 此时并不会打印任何内容,仅仅是构造了一个任务对象
    task = fetch_data() 
    
    # 只有显式地等待或交给事件循环,代码才会真正运转
    # await task 
    

    这种机制被称为懒启动(Lazy Evaluation)。如果我们允许 C++ 的协程在被调用时立即开始执行(即所谓的 Eager Evaluation),它可能会在尚未正确挂载到事件循环(Event Loop)之前,就过早地触发了底层的 I/O 投递操作。这不仅破坏了状态的封装,还极易引发复杂的竞态条件。

    因此,一个健壮的 C++ Task<T> 必须是懒启动的。这在 C++20 中是通过定制 promise_type 的初始化行为来实现的:

    class promise_type {
    public:
        // 协程帧创建后立即挂起,不主动执行协程体代码
        auto initial_suspend() noexcept -> std::suspend_always { return {}; }
        // ...
    };
    

    通过返回 std::suspend_always,协程在完成内部状态帧的堆分配后会立刻交出控制权。这种设计使得异步任务可以像普通的数据结构一样被安全地传递、存储和组合,直到调用者显式地通过 co_await 来驱动它。

    2. 协程间的控制流移交

    异步操作很少是孤立存在的。当父协程执行 co_await child_task; 时,当前的执行流必须被挂起,并将 CPU 的控制权移交给子协程。同时,子协程必须知晓在自身执行完毕后,应该唤醒哪一个调用者。

    为了建立这种调用链,我们利用了 co_await 运算符所触发的编译器协议。

    在 C++20 中,co_await 并非一个简单的挂起指令,而是一个可定制的控制流拦截点。当编译器遇到 co_await <expr> 时,它会要求 <expr> 产出一个符合特定接口的 Awaiter 对象,并依次调用其三个核心方法:

    1. await_ready():探测状态。询问异步操作是否已经完成。如果返回 true,编译器将走“快速通道”,直接跳过挂起阶段;如果返回 false,则准备挂起当前协程。
    2. await_suspend(std::coroutine_handle<>):核心拦截点。在当前协程的物理状态(寄存器、局部变量)被安全保存到堆上的协程帧后,编译器会调用此方法,并将当前(父)协程的句柄作为参数传入。
    3. await_resume():结果提取点。当协程被再次唤醒时,此方法的返回值将作为整个 co_await 表达式的结果。

    基于这一协议,我们在 Task 内部定义了专门的 Awaiter,以此来接管并路由控制流:

    class Awaiter {
    public:
        explicit Awaiter(handle_type handle) : handle_{ handle } {}
    
        // 1. 探测状态:如果子协程尚未执行完毕,则强制父协程挂起
        bool await_ready() const noexcept 
        { 
            return !handle_ || handle_.done(); 
        }
    
        // 2. 挂起时的控制流路由
        auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<> 
        {
            // 将父协程的句柄 (next) 记录在子协程的 promise 状态中
            handle_.promise().next = next;
            // 返回子协程的句柄,指示 C++ 运行时将执行流切换至子协程
            return handle_;
        }
    
        // 3. 唤醒后的结果提取
        auto await_resume() const -> T 
        {
            if (!handle_) throw std::logic_error{ "Invalid handle" };
            return handle_.promise().result();
        }
    
    private:
        handle_type handle_; // 子协程的句柄
    };
    

    通过这一套状态机转换,C++ 将底层的调度权完全下放给了库作者。

    在 await_suspend 执行的瞬间,父协程已被安全冻结。

    我们将其句柄保存在子协程的 promise_type::next 字段里,从而在内存中建立了一个单向的调用链表(父 -> 子)。

    紧接着返回子协程的 handle_,运行时会直接跳转执行子协程代码,实现了零开销的上下文切换。

    3. 栈溢出风险与对称传输(Symmetric Transfer)

    子协程执行到末尾(或遇到 co_return)时,需要唤醒之前等待它的父协程。这往往是自定义协程实现中最容易出错的环节。

    直觉上的做法是,在子协程的收尾阶段直接调用 next.resume()。然而,这种非对称传输(Asymmetric Transfer)存在致命缺陷:

    resume() 本质上是一个常规的同步函数调用。

    在网络服务这类存在深层嵌套或无限循环挂起的场景中(例如 while(true) { co_await read(); }),每一次 resume() 都会在操作系统的线程栈上压入一个新的栈帧。调用链越长,栈越深,最终必然导致 Stack Overflow(栈溢出)。

    为了提供工业级的稳定性,Task 在收尾时必须采用对称传输(Symmetric Transfer):

    class FinalAwaiter {
    public:
        bool await_ready() const noexcept { return false; }
    
        template<typename Promise>
        auto await_suspend(std::coroutine_handle<Promise> handle) const noexcept -> std::coroutine_handle<> 
        {
            auto next = handle.promise().next;
            // 关键点:直接返回父协程的句柄,而非调用 next.resume()
            return next ? next : std::noop_coroutine();
        }
    
        void await_resume() const noexcept {}
    };
    
    // 在 promise_type 中指定收尾行为:
    auto final_suspend() noexcept -> FinalAwaiter { return {}; }
    

    通过让 final_suspend 返回一个包含父协程句柄的 Awaiter,编译器会采用类似尾调用优化(Tail Call)的机制:

    它会首先将当前子协程的物理栈帧安全剥离,然后再以平级跳转的方式进入父协程。

    在这种机制的保障下,无论业务逻辑中 co_await 嵌套了多少层,底层的线程调用栈深度始终保持恒定 (O(1))。

    4. 返回值的提取与异常路由

    异步任务不仅涉及控制流的跳转,还必须安全地跨越挂起边界传递数据或异常,并且表现得如同普通的 C++ 函数调用一样。

    在子协程内部,产生的值或未捕获的异常被分别存储在 promise_type 的 std::optional<T> 和 std::exception_ptr 中。当父协程通过对称传输被唤醒,并执行 await_resume() 时,需要提取这些结果:

    auto result() -> T 
    {
        if (exception_) 
            std::rethrow_exception(exception_);
        return std::move(value_).value();
    }
    

    这里包含两个重要的设计约束:

    1. 异常透明性:std::rethrow_exception 确保了子协程中发生的异常能够被无缝抛出,并被父协程的 try-catch 块捕获,维持了 C++ 异常处理语义的连贯性。
    2. 资源所有权转移:通过 std::move 提取值,保证了诸如 std::unique_ptr 或封装了系统资源(如文件描述符)的不可拷贝对象(Move-Only Types)能够被正确返回。

    5. 协程帧的生命周期管理与单次消费语义

    无栈协程的局部变量和 promise_type 被编译器分配在堆上的协程帧(Coroutine Frame)中。由于 C++ 没有垃圾回收机制,资源泄漏是协程编程中的主要风险之一。

    依据 C++ 核心的 RAII(资源获取即初始化)原则,Task 对象作为协程句柄的唯一持有者,理应负责这块内存的清理:

    template<typename T>
    class Task {
    public:
        ~Task() 
        { 
            if (handle_) handle_.destroy(); 
        }
    
        // 限制为右值调用,且不转移 handle_ 的所有权
        auto operator co_await() && noexcept { return Awaiter{ handle_ }; }
    };
    

    这里有两个深思熟虑的设计权衡:

    第一:为什么限制 operator co_await 为右值版本(&&)?
    协程代表一个异步计算过程,其内部结果(特别是前文提到的 Move-Only 类型)在 await_resume 中是被破坏性提取的(std::move)。这意味着一个 Task 在逻辑上只能被消费一次。如果允许对左值的 Task 进行 co_await,调用者可能会意外地多次等待同一个任务:

    Task<int> t = do_work();
    auto res1 = co_await t;
    auto res2 = co_await t; // 错误:底层协程已经结束,状态帧已被销毁
    

    通过添加 && 限定符,我们利用 C++ 的类型系统在编译期强制执行了“单次消费(Single-Shot)”语义。调用者必须直接等待临时对象(如 co_await do_work();),或者显式地转移所有权(co_await std::move(t);)。这在接口层面明确了状态机的生命周期契约。

    第二:为什么在右值版本中,依然不剥夺 Task 的所有权?
    通常在处理右值时,我们会使用 std::exchange 来转移底层资源。但在这里,我们仅向 Awaiter 传递了句柄的值。
    当执行 auto res = co_await do_work(); 时,do_work() 产生的 Task 临时对象的生命周期会被编译器自动延续,直到整个 co_await 表达式求值完毕(即 await_resume() 返回之后)。此时,临时 Task 对象被析构,从而触发 handle_.destroy()。
    如果我们在此处剥夺了 Task 的所有权,清理责任就会落空。这种保留所有权的设计,确保了无论是正常执行完毕还是因异常提前中断,底层堆内存都能依托 Task 临时对象的析构函数被可靠地回收,实现了严格的内存安全。

    补充说明
    在 C++ 中,临时对象的生命周期会持续到包含它的完整表达式(Full-expression)结束(通常是遇到分号 ;)。

    当我们写下如下代码时:

    auto res = co_await do_something();
    

    编译器实际上会做如下展开(伪代码):

    {
        // 1. 调用函数,产生临时的 Task 右值对象
        auto&& __tmp_task = do_something(); 
        
        // 2. 调用 operator co_await,产生临时的 Awaiter 对象
        auto&& __awaiter = __tmp_task.operator co_await();
        
        if (!__awaiter.await_ready()) {
            // 3. 挂起当前协程,并调用 await_suspend
            __awaiter.await_suspend(current_coro_handle);
            // <--- 协程在这里彻底挂起,CPU 离开 --->
            // <--- 时空流转,无论过了多久,终于被唤醒 --->
        }
        
        // 4. 唤醒后,调用 await_resume 提取结果
        auto res = __awaiter.await_resume();
        
    } // 5. 完整表达式结束!按照构造的相反顺序销毁临时对象:先销毁 __awaiter,再销毁 __tmp_task
    

    关键点在于: 协程在挂起时,编译器非常清楚 __tmp_task 和 __awaiter 的生命周期需要跨越挂起点。因此,编译器不会把它们分配在容易被销毁的线程栈(Thread Stack)上,而是直接将它们作为局部变量,打包存储在“当前(父)协程的堆分配状态帧(Coroutine Frame)”中。

    这意味着:

    Task 对象在整个挂起期间一直安然无恙地活在堆内存里。

    唤醒时,Awaiter 也并没有在栈上重建,你访问的依然是挂起前保存在堆里的那个确切的 Awaiter 实例。

    Task 必定比 Awaiter 活得更久(先构造的后销毁)。

    因此,Awaiter 内部仅持有 handle_ 的一个浅拷贝是绝对安全的,Task 完全不需要把所有权 exchange 给 Awaiter。

    结语

    设计一个现代 C++ 的 Task 类,并非对关键字的简单拼接,而是对执行流跳转和资源生命周期的精密编排。通过懒启动隔离控制流、利用对称传输突破调用栈限制、借助 RAII 保障内存释放,我们最终构建出了一个符合 C++ 哲学体系的高性能并发原语。

    完整代码

    export module xin.task;
    
    import std;
    
    namespace xin {
    
    class FinalAwaiter {
    public:
        [[nodiscard]]
        constexpr auto await_ready() const noexcept -> bool
        {
            return false;
        }
    
        template<typename Promise>
        auto await_suspend(std::coroutine_handle<Promise> handle) const noexcept -> std::coroutine_handle<>
        {
            auto next = handle.promise().next;
            return next ? next : std::noop_coroutine();
        }
    
        void await_resume() const noexcept {}
    };
    
    
    export template<typename T = void>
    class Task;
    
    export template<typename T>
    class Task {
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
    
        class promise_type {
        public:
            auto get_return_object() noexcept -> Task { return Task{ handle_type::from_promise(*this) }; }
    
            auto initial_suspend() noexcept -> std::suspend_always { return {}; }
    
            auto final_suspend() noexcept -> FinalAwaiter { return {}; }
    
            void unhandled_exception() noexcept { exception_ = std::current_exception(); }
    
            template<typename U>
                requires std::convertible_to<U&&, T>
            void return_value(U&& value) noexcept(std::is_nothrow_constructible_v<T, U&&>)
            {
                value_.emplace(std::forward<U>(value));
            }
    
            [[nodiscard]]
            auto result() -> T
            {
                if (exception_)
                    std::rethrow_exception(exception_);
    
                if (!value_)
                    throw std::logic_error{ "No value returned from coroutine" };
    
                auto out = std::move(*value_);
                value_.reset();
                return out;
            }
    
            std::coroutine_handle<> next{ nullptr };
    
        private:
            std::exception_ptr exception_;
            std::optional<T> value_;
        };
    
        Task() = default;
    
        Task(handle_type handle)
          : handle_{ handle }
        {}
    
        Task(const Task&) = delete;
        auto operator=(const Task&) -> Task& = delete;
    
        Task(Task&& other) noexcept
          : handle_{ std::exchange(other.handle_, {}) }
        {}
    
        auto operator=(Task&& other) noexcept -> Task&
        {
            if (this == &other)
                return *this;
    
            if (handle_)
                handle_.destroy();
    
            handle_ = std::exchange(other.handle_, nullptr);
            return *this;
        }
    
        ~Task()
        {
            if (handle_)
                handle_.destroy();
        }
    
        [[nodiscard]]
        auto done() const noexcept -> bool
        {
            return !handle_ || handle_.done();
        }
    
        [[nodiscard]]
        auto handle() const noexcept -> handle_type
        {
            return handle_;
        }
    
        class Awaiter {
        public:
            explicit Awaiter(handle_type handle)
              : handle_{ handle }
            {}
    
            [[nodiscard]]
            auto await_ready() const noexcept -> bool
            {
                return !handle_ || handle_.done();
            }
    
            auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<>
            {
                handle_.promise().next = next;
                return handle_;
            }
    
            auto await_resume() const -> T
            {
                if (!handle_)
                    throw std::logic_error{ "Invalid coroutine handle" };
    
                return handle_.promise().result();
            }
    
        private:
            handle_type handle_;
        };
    
        auto operator co_await() && noexcept { return Awaiter{ handle_ }; }
    
    private:
        handle_type handle_{ nullptr };
    };
    
    
    export template<>
    class Task<void> {
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
    
        class promise_type {
        public:
            std::coroutine_handle<> next{ nullptr };
    
            auto get_return_object() noexcept -> Task { return Task{ handle_type::from_promise(*this) }; }
    
            auto initial_suspend() noexcept -> std::suspend_always { return {}; }
    
            auto final_suspend() noexcept -> FinalAwaiter { return {}; }
    
            void unhandled_exception() noexcept { exception_ = std::current_exception(); }
    
            void return_void() noexcept {}
    
            void result()
            {
                if (exception_)
                    std::rethrow_exception(exception_);
            }
    
        private:
            std::exception_ptr exception_;
        };
    
        Task() = default;
    
        Task(handle_type handle)
          : handle_{ handle }
        {}
    
        Task(const Task&) = delete;
        auto operator=(const Task&) -> Task& = delete;
    
        Task(Task&& other) noexcept
          : handle_{ std::exchange(other.handle_, nullptr) }
        {}
    
        auto operator=(Task&& other) noexcept -> Task&
        {
            if (this == &other)
                return *this;
    
            if (handle_)
                handle_.destroy();
    
            handle_ = std::exchange(other.handle_, {});
            return *this;
        }
    
        ~Task()
        {
            if (handle_)
                handle_.destroy();
        }
    
        [[nodiscard]]
        auto done() const noexcept -> bool
        {
            return !handle_ || handle_.done();
        }
    
        [[nodiscard]]
        auto handle() const noexcept -> handle_type
        {
            return handle_;
        }
    
        class Awaiter {
        public:
            explicit Awaiter(handle_type handle)
              : handle_{ handle }
            {
            }
    
            [[nodiscard]]
            auto await_ready() const noexcept -> bool
            {
                return !handle_ || handle_.done();
            }
    
            auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<>
            {
                handle_.promise().next = next;
                return handle_;
            }
    
            void await_resume() const
            {
                if (!handle_)
                    throw std::logic_error{ "Invalid coroutine handle" };
    
                handle_.promise().result();
            }
    
        private:
            handle_type handle_;
        };
    
        auto operator co_await() && noexcept { return Awaiter{ handle_ }; }
    
    private:
        handle_type handle_{ nullptr };
    };
    
    } // namespace xin
    

  • 【 基于 io_uring 的 C++20 协程网络库】09 读路径优化:recv_multishot与ReceiveStream
    DoomjustinD Doomjustin

    在上一篇里,我们解决了"怎么把数据发出去"。这一篇转到读路径:把 recv_multishot 接进来,减少高频读取时的重复提交。

    如果沿用 async_read_some,每次读取都要先准备一块可写 buffer,再提交一次 recv,等 CQE 回来后再决定要不要继续下一次。这个模型本身没错,但放到 Echo、代理、网关这类长连接里,会很快变成重复劳动:准备 buffer、提交 SQE、等待 CQE、再补上下一次读取。

    recv_multishot 能直接解决这件事:一次提交,对应后续多次完成。

    在开始之前,先确认一下当前库的状态。前几篇一路写下来,IOContext 一直是个比较扁平的类:持有一个 io_uring ring 句柄,提供 SQE 获取和事件循环驱动,加上 work 计数和 stop 信号:

    class IOContext {
        ::io_uring ring_;
        int event_fd_;
        std::size_t outstanding_works_{ 0 };
        std::atomic<bool> should_stop_{ false };
    
        void run();
        auto sqe() -> ::io_uring_sqe*;
        void wakeup();
        // CQE 分发 ...
    };
    

    socket 层的每个异步操作,把自己包装成 Operation 提交进 ring,CQE 回来时由 IOContext 分发回去。至于 buffer,一直是由调用方临时传入,操作完成后调用方自己处理生命周期。

    这个结构目前是干净的,但 recv_multishot 要求的东西比这多一层。


    1. 直接加进去会碰到什么

    recv_multishot 需要 buffer ring:内核不再接受调用方临时传入的单块 buffer,而是要求预先注册一组固定大小的槽位。每次有数据到达,内核从 ring 里借一个槽位写入,CQE 里带上这次借出的槽位编号,调用方读完数据后再把槽位还回去。

    最直接的想法是往 IOContext 里加几个字段:

    class IOContext {
        ::io_uring ring_;
        int event_fd_;
        std::size_t outstanding_works_{ 0 };
        std::atomic<bool> should_stop_{ false };
    
        // 新加的 buffer ring 管理
        std::unordered_map<unsigned, BufferRing> buffer_rings_;
        unsigned next_bgid_{ 0 };
        std::optional<unsigned> default_bgid_;
    };
    

    功能上能跑,但麻烦随之而来。

    第一,IOContext 现在要同时处理两类完全不同的事情:一类是"这一次 submit_and_wait 怎么跑、CQE 怎么分发",另一类是"有哪些 buffer 组被注册在内核里、槽位怎么借出和归还"。这两件事没有天然的耦合关系,混在一起只会让两侧的逻辑都难以单独修改。

    第二,更直接的问题出在 CQE 分发上。之前"一个 CQE 对应一次操作完成"的判断,在 multishot 里不再成立——事件循环必须识别 IORING_CQE_F_MORE,在这个 flag 还存在时不能把这笔 work 从计数里扣掉。这不是细节调整,而是分发逻辑本身语义的变化。如果这块逻辑和 buffer ring 管理搅在同一段代码里,两边会互相干扰。

    第三,socket 层如果要用 buffer ring,就必须拿到 bgid 和 bid,然后到处传。调用方写业务代码时不该感知这些细节,但没有合适的封装层,这些细节就只能往上漏。

    所以在写 multishot awaiter 之前,要先把 IOContext 的职责拆开:

    1. 和 ring 驱动、唤醒、CQE 分发有关的逻辑,收进独立的 Scheduler。
    2. buffer ring 的建立、槽位借出和归还,收进独立的 BufferRingGroup。
    3. IOContext 自己只保留对外协调接口和 work tracking。
    4. socket 层只暴露消费接口,不让业务层直接碰 buffer slot。

    这不是为了"代码好看",而是为了把复杂度放回正确位置。不做这一步,后面无论接收流、buffer 池复用还是取消清理,都会越来越难理顺。


    2. 重构后的 IOContext

    拆分后的 IOContext 把两类职责分别交给两个内部类:

    class IOContext {
    private:
        class Scheduler {
            // ring 初始化、SQE 获取、submit_and_wait、wakeup
        };
    
        class BufferRingGroup {
            // setup buffer ring、release buffer slot、default bgid
        };
    
        Scheduler scheduler_;
        BufferRingGroup buffers_;
        std::size_t outstanding_works_{ 0 };
        std::atomic<bool> should_stop_{ false };
    };
    

    Scheduler 和 BufferRingGroup 的职责本来就不是一回事。

    Scheduler 关注的是"这一次事件循环怎么跑":

    1. 初始化和销毁 io_uring ring。
    2. 提供 SQE。
    3. submit_and_wait 后遍历 CQE,分发给对应 Operation。
    4. 通过 eventfd 做跨线程 stop 唤醒。

    尤其是第 3 点,在引入 multishot 之后已经不能再按"一个 CQE 对应一个完整操作"来理解了。Scheduler::schedule() 现在必须识别 IORING_CQE_F_MORE,只有在 multishot 真正结束时才能把这笔 work 从计数里扣掉。这意味着 recv_multishot 不只是给 socket 层加了个新能力,它也顺手改写了事件循环对"完成"这件事的理解。

    而 BufferRingGroup 关心的是另一件事:"有哪些 buffer 组被长期注册在内核里,以及它们怎么被重复借出和归还"。这部分如果混进 Scheduler,后者就会一边跑 CQE 批调度,一边操心 buffer 池资源生命周期,边界很快就会糊掉。

    接口本身也能看出这种分层:

    auto setup_buffer_ring(unsigned entries, unsigned size) -> unsigned
    {
        return buffers_.setup(scheduler_.ring(), entries, size);
    }
    
    void release_buffer_ring(unsigned bgid, unsigned bid)
    {
        buffers_.release(bgid, bid);
    }
    

    IOContext 在这里做的事情很克制:buffer ring 的建立确实需要底层 ring 句柄,但建立完成之后,后续使用者不必再感知这些细节。

    有了这层结构,ReceiveStream 才真正有了落脚点。否则它一边要操心 multishot 请求,一边还得自己解决 buffer group 的分配和归还,那就不是 socket 接口该承担的事情。

    它在 StreamSocket 上的入口很轻:

    auto receive_stream() -> ReceiveStream<Context>
    {
        auto default_bgid = this->context().default_buffer();
        if (!default_bgid)
            throw std::runtime_error{ "No default buffer ring available for receive stream" };
    
        return ReceiveStream<Context>{ this->context(), this->native_handle(), *default_bgid };
    }
    

    ReceiveStream 不是独立工作的,它默认依赖 IOContext 里已经准备好的 buffer ring。在当前实现里,第一次 setup_buffer_ring() 创建出来的组会自动成为默认组,所以 demo 里只做一次初始化就够了。


    3. 重构落地:把 buffer ring 收进 IOContext

    buffer ring 进入 IOContext 之后,下一步才轮到它自己的实现。

    BufferRingGroup::setup() 做了三件事:分配一整块连续内存、向内核注册一个 buf ring、把每个 slot 预先填进 ring。

    auto IOContext::BufferRingGroup::setup(::io_uring* ring, unsigned entries, unsigned size) -> unsigned
    {
        auto bgid = next_bgid_++;
        auto& buffer_ring = group_[bgid];
    
        buffer_ring.size = size;
        buffer_ring.entries = entries;
        buffer_ring.mask = ::io_uring_buf_ring_mask(entries);
    
        const auto alloc_size = static_cast<std::size_t>(entries * size);
        buffer_ring.base_address = memory_resource_->allocate(alloc_size, ALIGNMENT);
    
        int res = 0;
        buffer_ring.buffer = ::io_uring_setup_buf_ring(ring, entries, bgid, 0, &res);
    
        auto* base = static_cast<std::byte*>(buffer_ring.base_address);
        for (unsigned i = 0; i < entries; ++i)
            ::io_uring_buf_ring_add(buffer_ring.buffer, base + i * size, size, i, buffer_ring.mask, i);
    
        ::io_uring_buf_ring_advance(buffer_ring.buffer, entries);
        buffer_ring.tail = entries;
    
        if (!default_buffer_bgid_)
            default_buffer_bgid_ = bgid;
    
        return bgid;
    }
    

    这部分有三个会直接影响行为的选择。

    第一,所有 buffer slot 放在一整块连续内存里,而不是单独 new 一堆小块。这么做不是图省事,而是因为 buf ring 的典型使用方式本来就是"固定大小、重复借用、按槽位编号回收"。连续布局让 bid -> 地址 的映射退化成简单的指针偏移,后面 CQE 回来时只要 base + bid * size 就能找回数据。

    第二,bgid 的管理被封装在 BufferRingGroup 内部。调用方只拿到一个逻辑编号,不直接接触底层注册细节;默认组也在这里顺手建立起来,方便 socket 层提供一个无参的 receive_stream()。

    第三,归还路径同样应该放在这里,而不是散在各处调用 io_uring_buf_ring_add。因为释放 slot 本质上是 buffer ring 自己的状态变更,和具体是哪条连接、哪次读操作用过这个 slot 没关系。

    归还逻辑如下:

    void IOContext::BufferRingGroup::release(unsigned bgid, unsigned bid)
    {
        auto& buffer_ring = group_[bgid];
        auto* base = static_cast<std::byte*>(buffer_ring.base_address);
        const int offset = buffer_ring.tail & buffer_ring.mask;
    
        ::io_uring_buf_ring_add(
            buffer_ring.buffer,
            base + bid * buffer_ring.size,
            buffer_ring.size,
            bid,
            buffer_ring.mask,
            offset
        );
    
        ::io_uring_buf_ring_advance(buffer_ring.buffer, 1);
        ++buffer_ring.tail;
    }
    

    一个 slot 被借走时,bid 会随 CQE 一起回来;一个 slot 被归还时,BufferRingGroup 只需要根据同样的 bid 把它重新挂回 ring。这样 buffer 的借出和回收就闭上了环,之后 PooledBuffer 才有可能用 RAII 去包装它。


    4. 地基理顺后,再把 multishot 接上

    ReceiveStream 真正的提交动作在 arm_operation() 里:

    void arm_operation()
    {
        if (!operation_)
            operation_ = new MutishotReceiveOperation{ this, context_, bgid_ };
    
        auto* sqe = context_->sqe();
        ::io_uring_prep_recv_multishot(sqe, fd_, nullptr, 0, 0);
        sqe->flags |= IOSQE_BUFFER_SELECT;
        sqe->buf_group = bgid_;
        ::io_uring_sqe_set_data(sqe, static_cast<Operation*>(operation_));
        operation_armed_ = true;
    }
    

    这个提交动作有两个关键信息。

    第一,io_uring_prep_recv_multishot 只提交一次,但不是只收一次。只要内核认为这个 multishot 请求还有效,后面每次有数据到达,它都可以继续产出新的 CQE。

    第二,IOSQE_BUFFER_SELECT 告诉内核:接收缓冲区不由这次 SQE 显式给出,而是从 buf_group 对应的 buffer ring 里挑一个空闲槽位来写。用户态这次不传 void* buf,而是预先把一组固定大小的 buffer 注册给内核,后面由内核按需借用。

    CQE 回来时,res 是本次收到的字节数,flags 里则可能带上两个额外信息:

    1. IORING_CQE_F_BUFFER:说明这次结果对应某个 buffer ring 里的槽位。
    2. IORING_CQE_F_MORE:说明这个 multishot 请求还活着,后面还会继续收。

    这两个 flag 基本就是 ReceiveStream 最关心的信息:一个告诉它"数据落在哪块 buffer 上",一个告诉它"这条流还要不要继续等下去"。


    5. 最后才是用户侧 API:为什么返回 PooledBuffer

    如果只是为了把数据交给调用方,返回 std::span<std::byte> 看起来已经够了。但对 ReceiveStream 来说,这远远不够,因为 span 只是一段视图,不携带任何归还语义。

    内核从 buffer ring 里借出的槽位,最终必须还回去,否则 ring 只会越用越少,直到耗尽。这个归还动作不能指望业务层记得手工调用,所以这里必须做成 RAII。

    PooledBuffer 的职责正是这个:

    template<typename Context>
    class PooledBuffer {
    public:
        PooledBuffer(Context& context, unsigned bgid, unsigned bid, std::span<std::byte> buffer)
          : context_{ &context }, bgid_{ bgid }, bid_{ bid }, buffer_{ buffer }
        {}
    
        ~PooledBuffer()
        {
            release();
        }
    
        auto data() const noexcept -> std::span<std::byte>
        {
            return buffer_;
        }
    
    private:
        void release()
        {
            if (valid())
                context_->release_buffer_ring(bgid_, bid_);
        }
    };
    

    调用方拿到的是一块"带归还能力的 buffer 句柄"。它可以读这段数据,也可以把这段数据直接交给后续写操作;一旦这个对象离开作用域,对应槽位就自动回到 buffer ring,可供下一次接收复用。

    这也是 ReceiveStream 最重要的体验差异:上层拿到的是一块已经装好、生命周期清晰、离开作用域后能自动回收的结果,而不是一段裸内存视图。


    6. next() 为什么需要 ready queue

    ReceiveStream 对外只暴露一个动作:

    auto next() -> NextAwaiter
    {
        return NextAwaiter{ *this };
    }
    

    next() 能不能用得顺,关键在 NextAwaiter 和 ready_results_ 的配合。

    class NextAwaiter {
    public:
        auto await_ready() const noexcept -> bool
        {
            return !stream_.ready_results_.empty();
        }
    
        void await_suspend(std::coroutine_handle<> handle) noexcept
        {
            stream_.handle_ = handle;
    
            if (!stream_.operation_armed_)
                stream_.arm_operation();
        }
    
        auto await_resume() -> std::expected<PooledBuffer<Context>, std::error_code>
        {
            if (stream_.ready_results_.empty())
                return unexpected_system_error(std::errc::operation_canceled);
    
            auto result = std::move(stream_.ready_results_.front());
            stream_.ready_results_.pop_front();
            return result;
        }
    };
    

    这里的 ready_results_ 不是装饰品,而是必须存在的一层缓冲。

    recv_multishot 有一个天然特征:协程还没来得及再次 co_await next(),它就可能已经连续产出了多个 CQE。如果没有队列,后到的结果只能覆盖先到的结果,或者逼着 handle_cqe() 当场恢复协程并同步消化所有数据,这两种都不对。

    把这段行为画成时序会直观很多:

    [Kernel multishot]          [ReceiveStream]                 [User Coroutine]
        |                           |                                |
        |-- CQE #1 ---------------->| push ready_results_            |
        |-- CQE #2 ---------------->| push ready_results_            |
        |-- CQE #3 ---------------->| push ready_results_            |
        |                           |                                |
        |                           |<----------- co_await next() ---|
        |                           | pop #1 and resume              |
        |                           |<----------- co_await next() ---|
        |                           | pop #2 and resume              |
    

    也就是说,ready_results_ 不是优化项,而是 recv_multishot 能以“生产者-消费者节奏解耦”方式暴露给上层 API 的必要条件。

    用 std::deque<result_type> 把结果先存起来,问题就顺了:

    1. CQE 到达时,先转成 PooledBuffer 或 error,推进队列。
    2. 如果此刻真的有协程挂在 next() 上,就恢复它。
    3. 如果协程暂时还没来取,结果就老实待在队列里,等下一次 await_ready() 直接命中。

    这样一来,ReceiveStream 就有了一点异步生成器的感觉:底层持续产出,上层按自己的节奏消费,中间靠一个轻量队列解耦。


    7. CQE 到达后如何变成可消费结果

    handle_cqe() 做的事情,说到底就是把内核语义翻译成库语义。

    void handle_cqe(int result, std::uint32_t flags) noexcept
    {
        std::optional<PooledBuffer<Context>> pooled_buffer;
    
        if (flags & IORING_CQE_F_BUFFER) {
            const auto bid = static_cast<std::uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT);
            auto& buffer_ring = context_->buffer_ring(bgid_);
    
            auto* base = static_cast<std::byte*>(buffer_ring.base_address);
            std::span<std::byte> data{ base + bid * buffer_ring.size, static_cast<std::size_t>(result) };
            pooled_buffer.emplace(*context_, bgid_, bid, data);
        }
    
        if (result == -ECANCELED) {
            ready_results_.emplace_back(unexpected_system_error(std::errc::operation_canceled));
        }
        else if (result >= 0) {
            if (pooled_buffer)
                ready_results_.emplace_back(std::move(*pooled_buffer));
            else
                ready_results_.emplace_back(PooledBuffer<Context>{});
        }
        else {
            ready_results_.emplace_back(unexpected_system_error(-result));
        }
    
        if (handle_) {
            auto handle = std::exchange(handle_, nullptr);
            handle.resume();
        }
    }
    

    这一步分三层:

    第一层,从 flags 里提取 bid,再结合 bgid_ 找回 buffer ring,算出这次数据实际落在了哪一段内存上。

    第二层,把这段内存包装成 PooledBuffer。从这一刻开始,buffer 的归还责任被显式绑定到了对象生命周期上。

    第三层,把内核返回码整理成 std::expected:正常数据走 value,取消和错误走 error。这样上层的消费方式就统一了。

    还有一个细节:result >= 0 但没有 IORING_CQE_F_BUFFER 时,代码会塞一个默认构造的 PooledBuffer。这给上层留出了处理空结果的空间,比如把空 buffer 视为连接关闭。这个约定在 demo 里也直接用到了。


    8. 真正难的是收尾

    ReceiveStream 最容易写错的,不是提交 multishot,而是收尾。

    问题在于:当 ReceiveStream 析构时,内核里那笔 multishot 请求不一定已经结束。你当然可以提交一个 cancel SQE,但 cancel 也是异步的,在 cancel 的 CQE 真正回来之前,原来的 multishot CQE 仍然可能再到一次。这个窗口期一旦处理不好,要么 use-after-free,要么 buffer 泄漏。

    解决办法是:析构时不直接删 operation,而是先 detach()。

    void destroy() noexcept
    {
        if (operation_) {
            operation_->detach();
    
            auto* sqe = context_->sqe(false);
            ::io_uring_prep_cancel(sqe, static_cast<Operation*>(operation_), 0);
            ::io_uring_sqe_set_data(sqe, nullptr);
    
            operation_ = nullptr;
        }
    
        if (context_)
            context_ = nullptr;
    }
    

    detach() 的效果是把 operation 和 ReceiveStream 本体断开。之后如果晚到的 CQE 还落到这笔 operation 上,complete() 会走另一条路径:不再访问已经析构的 stream,而是仅仅把 buffer 槽位补回 buffer ring。

    void complete(int res, unsigned flags) override
    {
        const bool has_more = (flags & IORING_CQE_F_MORE) != 0;
    
        if (stream_) {
            if (!has_more) {
                stream_->operation_armed_ = false;
                stream_->operation_ = nullptr;
            }
    
            stream_->handle_cqe(res, flags);
        }
        else if (flags & IORING_CQE_F_BUFFER) {
            // stream 已不存在,只负责把槽位补回 buffer ring
            // ...
        }
    
        if (!has_more)
            delete this;
    }
    

    这样一来,ReceiveStream 的析构和 multishot 请求的最终死亡就不再需要严格同步,晚到的 CQE 也不会污染用户态状态,只会做最必要的资源回收。

    另外,move 构造和 move 赋值里也有一个对应的修正动作:如果 operation 还活着,就把 operation_->stream_ 改指向新的对象。这保证了 ReceiveStream 被移动后,后续 CQE 仍然能回到正确实例上。


    9. 实际用法

    在 demo 里,Echo Server 的 session 代码已经很干净了:

    auto session(ip::tcp::socket<IOContext> client) -> Task<>
    {
        auto stream = client.receive_stream();
    
        while (true) {
            auto read_result = co_await stream.next();
            if (!read_result) {
                spdlog::warn("Failed to read from client {}: {}",
                             client.native_handle(),
                             read_result.error().message());
                co_return;
            }
    
            auto received = read_result->data();
            if (received.empty()) {
                spdlog::info("Client {} disconnected", client.native_handle());
                co_return;
            }
    
            auto write_result = co_await timeout(async_write(client, received), 1ms);
            if (!write_result)
                co_return;
        }
    }
    

    这段代码最直观的变化是:读路径里已经看不到"准备 buffer -> 提交 recv -> 处理 buffer 生命周期"这些样板动作了。业务协程眼里只剩一件事:下一块数据什么时候到。

    这正是 ReceiveStream 的价值。它不是单纯给 recv_multishot 套了个 awaiter,而是顺手把 buffer ring、生命周期、结果排队和析构期收尾这些麻烦一起包掉了。上层得到的是一个可以连续 next() 的接收流,底层保留的则是 io_uring 多次投递的吞吐优势。

  • 登录

  • 没有帐号? 注册

  • 登录或注册以进行搜索。
d2learn forums Powered by NodeBB
  • 第一个帖子
    最后一个帖子
0
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组