跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 从零构建基于 C++20 的 Task

从零构建基于 C++20 的 Task

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 5 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 离线
    DoomjustinD 离线
    Doomjustin
    编写于 最后由 Doomjustin 编辑
    #1

    C++20 引入了无栈协程(Stackless Coroutines)的核心语言机制,但与之相配套的标准库高级抽象(如 std::task)并未同步提供。在构建基于 io_uring 或 epoll 的高性能并发框架时,我们不可避免地需要自行设计一个用于封装异步操作的返回类型:Task<T>。

    设计这样一个任务类型,不仅仅是对新关键字的语法包装,其本质是在解决两个系统级编程的核心问题:

    1. 控制流的无缝路由
    2. 堆分配状态帧的确定性释放。

    本文将探讨如何从零构建一个可用的Task<T>。

    1. 异步组合的困境与懒启动(Lazy Evaluation)

    在传统的同步流中,函数的调用即意味着执行的开始。但在异步架构中,任务的“构造”与“执行”往往需要被严格分离。

    为了建立直观的理解,我们可以先参考 Python 中的协程行为。在 Python 中,调用一个 async def 函数并不会立即执行其内部代码,而是仅仅返回一个协程对象:

    import asyncio
    
    async def fetch_data():
        print("开始发起网络请求...")
        # ...
    
    # 此时并不会打印任何内容,仅仅是构造了一个任务对象
    task = fetch_data() 
    
    # 只有显式地等待或交给事件循环,代码才会真正运转
    # await task 
    

    这种机制被称为懒启动(Lazy Evaluation)。如果我们允许 C++ 的协程在被调用时立即开始执行(即所谓的 Eager Evaluation),它可能会在尚未正确挂载到事件循环(Event Loop)之前,就过早地触发了底层的 I/O 投递操作。这不仅破坏了状态的封装,还极易引发复杂的竞态条件。

    因此,一个健壮的 C++ Task<T> 必须是懒启动的。这在 C++20 中是通过定制 promise_type 的初始化行为来实现的:

    class promise_type {
    public:
        // 协程帧创建后立即挂起,不主动执行协程体代码
        auto initial_suspend() noexcept -> std::suspend_always { return {}; }
        // ...
    };
    

    通过返回 std::suspend_always,协程在完成内部状态帧的堆分配后会立刻交出控制权。这种设计使得异步任务可以像普通的数据结构一样被安全地传递、存储和组合,直到调用者显式地通过 co_await 来驱动它。

    2. 协程间的控制流移交

    异步操作很少是孤立存在的。当父协程执行 co_await child_task; 时,当前的执行流必须被挂起,并将 CPU 的控制权移交给子协程。同时,子协程必须知晓在自身执行完毕后,应该唤醒哪一个调用者。

    为了建立这种调用链,我们利用了 co_await 运算符所触发的编译器协议。

    在 C++20 中,co_await 并非一个简单的挂起指令,而是一个可定制的控制流拦截点。当编译器遇到 co_await <expr> 时,它会要求 <expr> 产出一个符合特定接口的 Awaiter 对象,并依次调用其三个核心方法:

    1. await_ready():探测状态。询问异步操作是否已经完成。如果返回 true,编译器将走“快速通道”,直接跳过挂起阶段;如果返回 false,则准备挂起当前协程。
    2. await_suspend(std::coroutine_handle<>):核心拦截点。在当前协程的物理状态(寄存器、局部变量)被安全保存到堆上的协程帧后,编译器会调用此方法,并将当前(父)协程的句柄作为参数传入。
    3. await_resume():结果提取点。当协程被再次唤醒时,此方法的返回值将作为整个 co_await 表达式的结果。

    基于这一协议,我们在 Task 内部定义了专门的 Awaiter,以此来接管并路由控制流:

    class Awaiter {
    public:
        explicit Awaiter(handle_type handle) : handle_{ handle } {}
    
        // 1. 探测状态:如果子协程尚未执行完毕,则强制父协程挂起
        bool await_ready() const noexcept 
        { 
            return !handle_ || handle_.done(); 
        }
    
        // 2. 挂起时的控制流路由
        auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<> 
        {
            // 将父协程的句柄 (next) 记录在子协程的 promise 状态中
            handle_.promise().next = next;
            // 返回子协程的句柄,指示 C++ 运行时将执行流切换至子协程
            return handle_;
        }
    
        // 3. 唤醒后的结果提取
        auto await_resume() const -> T 
        {
            if (!handle_) throw std::logic_error{ "Invalid handle" };
            return handle_.promise().result();
        }
    
    private:
        handle_type handle_; // 子协程的句柄
    };
    

    通过这一套状态机转换,C++ 将底层的调度权完全下放给了库作者。

    在 await_suspend 执行的瞬间,父协程已被安全冻结。

    我们将其句柄保存在子协程的 promise_type::next 字段里,从而在内存中建立了一个单向的调用链表(父 -> 子)。

    紧接着返回子协程的 handle_,运行时会直接跳转执行子协程代码,实现了零开销的上下文切换。

    3. 栈溢出风险与对称传输(Symmetric Transfer)

    子协程执行到末尾(或遇到 co_return)时,需要唤醒之前等待它的父协程。这往往是自定义协程实现中最容易出错的环节。

    直觉上的做法是,在子协程的收尾阶段直接调用 next.resume()。然而,这种非对称传输(Asymmetric Transfer)存在致命缺陷:

    resume() 本质上是一个常规的同步函数调用。

    在网络服务这类存在深层嵌套或无限循环挂起的场景中(例如 while(true) { co_await read(); }),每一次 resume() 都会在操作系统的线程栈上压入一个新的栈帧。调用链越长,栈越深,最终必然导致 Stack Overflow(栈溢出)。

    为了提供工业级的稳定性,Task 在收尾时必须采用对称传输(Symmetric Transfer):

    class FinalAwaiter {
    public:
        bool await_ready() const noexcept { return false; }
    
        template<typename Promise>
        auto await_suspend(std::coroutine_handle<Promise> handle) const noexcept -> std::coroutine_handle<> 
        {
            auto next = handle.promise().next;
            // 关键点:直接返回父协程的句柄,而非调用 next.resume()
            return next ? next : std::noop_coroutine();
        }
    
        void await_resume() const noexcept {}
    };
    
    // 在 promise_type 中指定收尾行为:
    auto final_suspend() noexcept -> FinalAwaiter { return {}; }
    

    通过让 final_suspend 返回一个包含父协程句柄的 Awaiter,编译器会采用类似尾调用优化(Tail Call)的机制:

    它会首先将当前子协程的物理栈帧安全剥离,然后再以平级跳转的方式进入父协程。

    在这种机制的保障下,无论业务逻辑中 co_await 嵌套了多少层,底层的线程调用栈深度始终保持恒定 (O(1))。

    4. 返回值的提取与异常路由

    异步任务不仅涉及控制流的跳转,还必须安全地跨越挂起边界传递数据或异常,并且表现得如同普通的 C++ 函数调用一样。

    在子协程内部,产生的值或未捕获的异常被分别存储在 promise_type 的 std::optional<T> 和 std::exception_ptr 中。当父协程通过对称传输被唤醒,并执行 await_resume() 时,需要提取这些结果:

    auto result() -> T 
    {
        if (exception_) 
            std::rethrow_exception(exception_);
        return std::move(value_).value();
    }
    

    这里包含两个重要的设计约束:

    1. 异常透明性:std::rethrow_exception 确保了子协程中发生的异常能够被无缝抛出,并被父协程的 try-catch 块捕获,维持了 C++ 异常处理语义的连贯性。
    2. 资源所有权转移:通过 std::move 提取值,保证了诸如 std::unique_ptr 或封装了系统资源(如文件描述符)的不可拷贝对象(Move-Only Types)能够被正确返回。

    5. 协程帧的生命周期管理与单次消费语义

    无栈协程的局部变量和 promise_type 被编译器分配在堆上的协程帧(Coroutine Frame)中。由于 C++ 没有垃圾回收机制,资源泄漏是协程编程中的主要风险之一。

    依据 C++ 核心的 RAII(资源获取即初始化)原则,Task 对象作为协程句柄的唯一持有者,理应负责这块内存的清理:

    template<typename T>
    class Task {
    public:
        ~Task() 
        { 
            if (handle_) handle_.destroy(); 
        }
    
        // 限制为右值调用,且不转移 handle_ 的所有权
        auto operator co_await() && noexcept { return Awaiter{ handle_ }; }
    };
    

    这里有两个深思熟虑的设计权衡:

    第一:为什么限制 operator co_await 为右值版本(&&)?
    协程代表一个异步计算过程,其内部结果(特别是前文提到的 Move-Only 类型)在 await_resume 中是被破坏性提取的(std::move)。这意味着一个 Task 在逻辑上只能被消费一次。如果允许对左值的 Task 进行 co_await,调用者可能会意外地多次等待同一个任务:

    Task<int> t = do_work();
    auto res1 = co_await t;
    auto res2 = co_await t; // 错误:底层协程已经结束,状态帧已被销毁
    

    通过添加 && 限定符,我们利用 C++ 的类型系统在编译期强制执行了“单次消费(Single-Shot)”语义。调用者必须直接等待临时对象(如 co_await do_work();),或者显式地转移所有权(co_await std::move(t);)。这在接口层面明确了状态机的生命周期契约。

    第二:为什么在右值版本中,依然不剥夺 Task 的所有权?
    通常在处理右值时,我们会使用 std::exchange 来转移底层资源。但在这里,我们仅向 Awaiter 传递了句柄的值。
    当执行 auto res = co_await do_work(); 时,do_work() 产生的 Task 临时对象的生命周期会被编译器自动延续,直到整个 co_await 表达式求值完毕(即 await_resume() 返回之后)。此时,临时 Task 对象被析构,从而触发 handle_.destroy()。
    如果我们在此处剥夺了 Task 的所有权,清理责任就会落空。这种保留所有权的设计,确保了无论是正常执行完毕还是因异常提前中断,底层堆内存都能依托 Task 临时对象的析构函数被可靠地回收,实现了严格的内存安全。

    补充说明
    在 C++ 中,临时对象的生命周期会持续到包含它的完整表达式(Full-expression)结束(通常是遇到分号 ;)。

    当我们写下如下代码时:

    auto res = co_await do_something();
    

    编译器实际上会做如下展开(伪代码):

    {
        // 1. 调用函数,产生临时的 Task 右值对象
        auto&& __tmp_task = do_something(); 
        
        // 2. 调用 operator co_await,产生临时的 Awaiter 对象
        auto&& __awaiter = __tmp_task.operator co_await();
        
        if (!__awaiter.await_ready()) {
            // 3. 挂起当前协程,并调用 await_suspend
            __awaiter.await_suspend(current_coro_handle);
            // <--- 协程在这里彻底挂起,CPU 离开 --->
            // <--- 时空流转,无论过了多久,终于被唤醒 --->
        }
        
        // 4. 唤醒后,调用 await_resume 提取结果
        auto res = __awaiter.await_resume();
        
    } // 5. 完整表达式结束!按照构造的相反顺序销毁临时对象:先销毁 __awaiter,再销毁 __tmp_task
    

    关键点在于: 协程在挂起时,编译器非常清楚 __tmp_task 和 __awaiter 的生命周期需要跨越挂起点。因此,编译器不会把它们分配在容易被销毁的线程栈(Thread Stack)上,而是直接将它们作为局部变量,打包存储在“当前(父)协程的堆分配状态帧(Coroutine Frame)”中。

    这意味着:

    Task 对象在整个挂起期间一直安然无恙地活在堆内存里。

    唤醒时,Awaiter 也并没有在栈上重建,你访问的依然是挂起前保存在堆里的那个确切的 Awaiter 实例。

    Task 必定比 Awaiter 活得更久(先构造的后销毁)。

    因此,Awaiter 内部仅持有 handle_ 的一个浅拷贝是绝对安全的,Task 完全不需要把所有权 exchange 给 Awaiter。

    结语

    设计一个现代 C++ 的 Task 类,并非对关键字的简单拼接,而是对执行流跳转和资源生命周期的精密编排。通过懒启动隔离控制流、利用对称传输突破调用栈限制、借助 RAII 保障内存释放,我们最终构建出了一个符合 C++ 哲学体系的高性能并发原语。

    完整代码

    export module xin.task;
    
    import std;
    
    namespace xin {
    
    class FinalAwaiter {
    public:
        [[nodiscard]]
        constexpr auto await_ready() const noexcept -> bool
        {
            return false;
        }
    
        template<typename Promise>
        auto await_suspend(std::coroutine_handle<Promise> handle) const noexcept -> std::coroutine_handle<>
        {
            auto next = handle.promise().next;
            return next ? next : std::noop_coroutine();
        }
    
        void await_resume() const noexcept {}
    };
    
    
    export template<typename T = void>
    class Task;
    
    export template<typename T>
    class Task {
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
    
        class promise_type {
        public:
            auto get_return_object() noexcept -> Task { return Task{ handle_type::from_promise(*this) }; }
    
            auto initial_suspend() noexcept -> std::suspend_always { return {}; }
    
            auto final_suspend() noexcept -> FinalAwaiter { return {}; }
    
            void unhandled_exception() noexcept { exception_ = std::current_exception(); }
    
            template<typename U>
                requires std::convertible_to<U&&, T>
            void return_value(U&& value) noexcept(std::is_nothrow_constructible_v<T, U&&>)
            {
                value_.emplace(std::forward<U>(value));
            }
    
            [[nodiscard]]
            auto result() -> T
            {
                if (exception_)
                    std::rethrow_exception(exception_);
    
                if (!value_)
                    throw std::logic_error{ "No value returned from coroutine" };
    
                auto out = std::move(*value_);
                value_.reset();
                return out;
            }
    
            std::coroutine_handle<> next{ nullptr };
    
        private:
            std::exception_ptr exception_;
            std::optional<T> value_;
        };
    
        Task() = default;
    
        Task(handle_type handle)
          : handle_{ handle }
        {}
    
        Task(const Task&) = delete;
        auto operator=(const Task&) -> Task& = delete;
    
        Task(Task&& other) noexcept
          : handle_{ std::exchange(other.handle_, {}) }
        {}
    
        auto operator=(Task&& other) noexcept -> Task&
        {
            if (this == &other)
                return *this;
    
            if (handle_)
                handle_.destroy();
    
            handle_ = std::exchange(other.handle_, nullptr);
            return *this;
        }
    
        ~Task()
        {
            if (handle_)
                handle_.destroy();
        }
    
        [[nodiscard]]
        auto done() const noexcept -> bool
        {
            return !handle_ || handle_.done();
        }
    
        [[nodiscard]]
        auto handle() const noexcept -> handle_type
        {
            return handle_;
        }
    
        class Awaiter {
        public:
            explicit Awaiter(handle_type handle)
              : handle_{ handle }
            {}
    
            [[nodiscard]]
            auto await_ready() const noexcept -> bool
            {
                return !handle_ || handle_.done();
            }
    
            auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<>
            {
                handle_.promise().next = next;
                return handle_;
            }
    
            auto await_resume() const -> T
            {
                if (!handle_)
                    throw std::logic_error{ "Invalid coroutine handle" };
    
                return handle_.promise().result();
            }
    
        private:
            handle_type handle_;
        };
    
        auto operator co_await() && noexcept { return Awaiter{ handle_ }; }
    
    private:
        handle_type handle_{ nullptr };
    };
    
    
    export template<>
    class Task<void> {
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
    
        class promise_type {
        public:
            std::coroutine_handle<> next{ nullptr };
    
            auto get_return_object() noexcept -> Task { return Task{ handle_type::from_promise(*this) }; }
    
            auto initial_suspend() noexcept -> std::suspend_always { return {}; }
    
            auto final_suspend() noexcept -> FinalAwaiter { return {}; }
    
            void unhandled_exception() noexcept { exception_ = std::current_exception(); }
    
            void return_void() noexcept {}
    
            void result()
            {
                if (exception_)
                    std::rethrow_exception(exception_);
            }
    
        private:
            std::exception_ptr exception_;
        };
    
        Task() = default;
    
        Task(handle_type handle)
          : handle_{ handle }
        {}
    
        Task(const Task&) = delete;
        auto operator=(const Task&) -> Task& = delete;
    
        Task(Task&& other) noexcept
          : handle_{ std::exchange(other.handle_, nullptr) }
        {}
    
        auto operator=(Task&& other) noexcept -> Task&
        {
            if (this == &other)
                return *this;
    
            if (handle_)
                handle_.destroy();
    
            handle_ = std::exchange(other.handle_, {});
            return *this;
        }
    
        ~Task()
        {
            if (handle_)
                handle_.destroy();
        }
    
        [[nodiscard]]
        auto done() const noexcept -> bool
        {
            return !handle_ || handle_.done();
        }
    
        [[nodiscard]]
        auto handle() const noexcept -> handle_type
        {
            return handle_;
        }
    
        class Awaiter {
        public:
            explicit Awaiter(handle_type handle)
              : handle_{ handle }
            {
            }
    
            [[nodiscard]]
            auto await_ready() const noexcept -> bool
            {
                return !handle_ || handle_.done();
            }
    
            auto await_suspend(std::coroutine_handle<> next) -> std::coroutine_handle<>
            {
                handle_.promise().next = next;
                return handle_;
            }
    
            void await_resume() const
            {
                if (!handle_)
                    throw std::logic_error{ "Invalid coroutine handle" };
    
                handle_.promise().result();
            }
    
        private:
            handle_type handle_;
        };
    
        auto operator co_await() && noexcept { return Awaiter{ handle_ }; }
    
    private:
        handle_type handle_{ nullptr };
    };
    
    } // namespace xin
    
    1 条回复 最后回复
    0

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组