跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 启动并分离 - co_spawn

启动并分离 - co_spawn

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 15 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 离线
    DoomjustinD 离线
    Doomjustin
    编写于 最后由 Doomjustin 编辑
    #1

    在基于 C++20 协程构建的异步框架里,Task<T> 通常有着严格的结构化并发语义:调用者必须去 co_await 它,子任务的生命周期被死死地绑在父任务上。

    但现实世界没这么理想,系统架构中必然存在同步世界与异步世界的交汇点。最典型的例子就是 TCP 服务器的事件循环:

    // 这是一个底层的同步事件循环
    void server_accept_loop(io_context& ctx) {
        while (true) {
            stream_socket client = accept_connection(ctx);
            
            // 业务协程:处理单个客户端连接
            // 函数签名:Task<void> handle_client(stream_socket client);
            
            // 问题:怎么在这里启动 handle_client,然后不管它,直接去接下一个客?
            
            // 方案 1: 直接调用?没用,返回值 Task 被丢弃,协程根本不会跑(惰性求值)。
            // handle_client(std::move(client)); 
            
            // 方案 2: 使用 co_await?编译直接报错!因为当前函数是个普通函数,不是协程。
            // co_await handle_client(std::move(client));
    
            // 方案 3: 把server_accept_loop的返回值改成Task<void>。这里使用co_await
            // 可以编译,但是一次只能处理一个client
        }
    }
    

    为了解决这个问题,我们必须提供一个“启动并分离(Fire and Forget)”的方法,类似std::thread的detach模式。这没法用常规的 Task<T> 表达,我们需要自己捏一个底层原语:co_spawn。

    下面我们将从零开始,一步步打磨出一个内存安全、支持优雅停机的 co_spawn。


    1. 裸分离:让编译器帮我们擦屁股

    要把一个任务扔到后台,第一个要面对的灵魂拷问就是:这个协程在堆上分配的内存帧(Coroutine Frame),最后谁来删?

    既然分离出去了,外部就不存在任何变量持有它的句柄。在 C++20 里,最优雅的解法是:配置好参数,让编译器自己管理。

    按照协程规范,只要 promise_type::final_suspend() 返回 std::suspend_never,协程走到生命周期尽头时,运行时就会自动 delete 掉那块堆内存。据此,我们可以写出一个极简的“裸分离”壳子:

    // 纯粹的空壳,仅用于触发编译器的自动清理
    struct DetachedTask {
        struct promise_type {
            auto get_return_object() noexcept { return detached{}; }
            
            // 饥饿启动:一创建就立马执行
            auto initial_suspend() noexcept { return std::suspend_never{}; }
            
            // 核心:结束时不挂起,触发自动销毁
            auto final_suspend() noexcept { return std::suspend_never{}; }
            
            void return_void() noexcept {}
            void unhandled_exception() noexcept { std::terminate(); }
        };
    };
    
    // 极简版原语
    template<typename Awaitable>
    auto co_spawn(Awaitable awaitable) -> DetachedTask 
    {
        co_await std::move(awaitable);
    }
    

    然后,我们在同步回调里调用 co_spawn(handle_client(std::move(client))); 时,协程帧会立即投入运行。遇到 I/O 挂起时,控制流会 return 回主循环(不会阻塞线程!)。等任务彻底跑完,走向 final_suspend,内存安全摧毁,干干净净。

    2. 状态追踪:用 RAII 告别“幽灵任务”

    上面这套“裸分离”虽然在语言机制上跑得通,但在工程上其实是个定时炸弹。因为它没法做状态追踪,也就没法支持服务器的优雅停机(Graceful Shutdown)。

    试想一下,如果你发个 SIGTERM 准备关进程,底层的事件分发器怎么知道还有多少个 co_spawn 出去的任务在挂起等 I/O?如果直接把底层上下文销毁了,等这些“幽灵任务”被唤醒时,面对的就是一片废墟,当场 Core Dump。

    所以,分离出的协程必须和底层的上下文绑定生命周期:诞生时登记,死亡时注销。
    这里我们假定一个 context 应该支持 add_work() 和 drop_work() 来管理分离出去的任务。

    // 假定上下文支持增减引用计数
    template<typename T>
    concept tracking_context = requires(T& ctx) 
    {
        ctx.add_work();
        ctx.drop_work();
    };
    
    template<tracking_context Context>
    struct DetachedTask {
        struct promise_type {
            Context* context = nullptr;
    
            // 拦截参数:拿到上下文引用,生命周期开始时登记
            template<typename Awaitable>
            promise_type(Context& ctx, Awaitable&&) 
              : context{ &ctx } 
            {
                context->add_work(); 
            }
    
            // 绑定析构:随协程帧被编译器销毁时,自动注销
            ~promise_type() 
            {
                if (context) context->drop_work(); 
            }
    
            auto get_return_object() noexcept { return DetachedTask{}; }
            auto initial_suspend() noexcept { return std::suspend_never{}; }
            auto final_suspend() noexcept { return std::suspend_never{}; }
            void return_void() noexcept {}
            void unhandled_exception() noexcept { std::terminate(); }
        };
    };
    

    3. 最终的 co_spawn

    有了上面这个支持状态追踪的 detached_task,我们就可以给出 co_spawn 的最终接口了。

    // 注意这里的 Awaitable awaitable 是按值传递!
    template<tracking_context Context, awaitable Awaitable>
        requires std::movable<std::remove_cvref_t<Awaitable>>
    auto co_spawn(Context& ctx, Awaitable awaitable) -> DetachedTask<Context> 
    {
        // 移动进协程帧里,生命周期交给DetachedTask
        co_await std::move(awaitable);
    }
    

    为什么要Awaitble必须按值传?
    在协程里,如果参数是引用,堆上的协程帧就只会存个指针。像 handle_client(std::move(client)) 这种调用,产生的是个临时对象(右值)。如果 co_spawn 接的是个引用,等它内部第一次 co_await 挂起、把控制权还给外层时,这个临时对象早就析构了!这会导致极其隐蔽的悬垂引用Bug。

    通过强制按值传递,我们用移动语义,把临时的业务任务移动到了协程帧内部,只要协程不死,它的状态就绝对安全。

    4. 补充concept:到底什么是 awaitable?

    细心的朋友肯定注意到了,在最终的 co_spawn 签名里,我用了一个 awaitable 的概念。

    在 C++20 里,一个东西能被 await,无非三种情况:

    1. 它自己就是个 awaiter,即有3个await函数
    2. 它重载了成员方法 operator co_await()
    3. 有对应的全局重载。

    我们就把这个concept用代码翻译出来:

    template<typename T>
    concept awaiter = requires(T& t, std::coroutine_handle<> handle)
    {
        { t.await_ready() } -> std::convertible_to<bool>;
        t.await_suspend(handle);
        t.await_resume();
    };
    
    template<typename T>
    concept has_operator_co_await = requires(T&& t)
    {
        { std::forward<T>(t).operator co_await() } -> awaiter;
    };
    
    template<typename T>
    concept has_global_operator_co_await = requires(T&& t)
    {
        { operator co_await(std::forward<T>(t)) } -> awaiter;
    };
    
    template<typename T>
    concept awaitable = awaiter<T> 
                     || has_operator_co_await<T> 
                     || has_global_operator_co_await<T>;
    

    实战演示

    最后,给一段伪代码示例,看看它是怎么在业务里落地的:

    import std;
    
    // 1. 实现一个满足 tracking_context 契约的上下文
    struct MyIOContext {
        int active_tasks = 0;
        
        void add_work() 
        {
            ++active_tasks;
            std::cout << "[Context] 任务+1,当前活跃数: " << active_tasks << "\n";
        }
        
        void drop_work() 
        {
            --active_tasks;
            std::cout << "[Context] 任务结束,当前活跃数: " << active_tasks << "\n";
        }
        
        void run_loop() 
        {
            // 真实场景里,这里是 epoll_wait 或 io_uring_enter 阻塞等事件
            std::cout << "[Context] 开启事件循环,等待 I/O...\n";
        }
    };
    
    // 2. 模拟一个能被 co_await 的异步操作 (满足 awaitable 契约)
    struct DummyAsyncRead {
        bool await_ready() { return false; }
        void await_suspend(std::coroutine_handle<>) 
        {
            std::cout << "  -> 协程挂起,把 fd 注册到 epoll...\n";
        }
    
        void await_resume() 
        {
            std::cout << "  -> 协程恢复,拿到数据!\n";
        }
    };
    
    // 业务逻辑协程
    DummyAsyncRead handle_client(int client_fd) 
    {
        std::cout << "开始处理客户端: " << client_fd << "\n";
        // 遇到 IO 挂起
        co_await DummyAsyncRead{}; 
    }
    
    // 3. 跑起来
    int main() {
        MyIOContext ctx;
        
        std::cout << "--- 服务器启动 ---\n";
        
        // 启动并分离,立刻返回
        co_spawn(ctx, handle_client(1001));
        co_spawn(ctx, handle_client(1002));
        
        std::cout << "--- 同步的 main 函数丝毫不受阻塞 ---\n";
        
        ctx.run_loop();
        
        return 0;
    }
    

    一个更具体的demo可以看这里:demo

    完整代码

    
    template<typename T>
    concept awaiter = requires(T& t, std::coroutine_handle<> handle)
    {
        { t.await_ready() } -> std::convertible_to<bool>;
        t.await_suspend(handle);
        t.await_resume();
    };
    
    template<typename T>
    concept has_operator_co_await = requires(T&& t)
    {
        { std::forward<T>(t).operator co_await() } -> awaiter;
    };
    
    template<typename T>
    concept has_global_operator_co_await = requires(T&& t)
    {
        { operator co_await(std::forward<T>(t)) } -> awaiter;
    };
    
    template<typename T>
    concept awaitable = awaiter<T> 
                     || has_operator_co_await<T> 
                     || has_global_operator_co_await<T>;
    
    template<typename T>
    concept tracking_context = requires(T& ctx)
    {
        ctx.add_work();
        ctx.drop_work();
    };
    
    template<tracking_context Context>
    struct DetachedTask {
        struct promise_type {
            Context* context = nullptr;
    
            template<typename Awaitable>
            promise_type(Context& ctx, Awaitable&& awaitable)
              : context{ &ctx }
            {
                context->add_work();
            }
    
            ~promise_type()
            {
                context->drop_work();
            }
    
            auto get_return_object() noexcept { return DetachedTask{}; }
    
            auto initial_suspend() noexcept { return std::suspend_never{}; }
    
            auto final_suspend() noexcept
            {
                return std::suspend_never{};
            }
    
            void return_void() noexcept {}
    
            void unhandled_exception() noexcept
            {
                std::terminate();
            }
        };
    };
    
    export template<tracking_context Context, awaitable Awaitable>
        requires std::movable<std::remove_cvref_t<Awaitable>>
    auto co_spawn(Context& ctx, Awaitable awaitable) -> DetachedTask<Context>
    {
        co_await std::move(awaitable);
    }
    
    1 条回复 最后回复
    0

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组