启动并分离 - co_spawn
-
在基于 C++20 协程构建的异步框架里,
Task<T>通常有着严格的结构化并发语义:调用者必须去co_await它,子任务的生命周期被死死地绑在父任务上。但现实世界没这么理想,系统架构中必然存在同步世界与异步世界的交汇点。最典型的例子就是 TCP 服务器的事件循环:
// 这是一个底层的同步事件循环 void server_accept_loop(io_context& ctx) { while (true) { stream_socket client = accept_connection(ctx); // 业务协程:处理单个客户端连接 // 函数签名:Task<void> handle_client(stream_socket client); // 问题:怎么在这里启动 handle_client,然后不管它,直接去接下一个客? // 方案 1: 直接调用?没用,返回值 Task 被丢弃,协程根本不会跑(惰性求值)。 // handle_client(std::move(client)); // 方案 2: 使用 co_await?编译直接报错!因为当前函数是个普通函数,不是协程。 // co_await handle_client(std::move(client)); // 方案 3: 把server_accept_loop的返回值改成Task<void>。这里使用co_await // 可以编译,但是一次只能处理一个client } }为了解决这个问题,我们必须提供一个“启动并分离(Fire and Forget)”的方法,类似std::thread的detach模式。这没法用常规的
Task<T>表达,我们需要自己捏一个底层原语:co_spawn。下面我们将从零开始,一步步打磨出一个内存安全、支持优雅停机的
co_spawn。
1. 裸分离:让编译器帮我们擦屁股
要把一个任务扔到后台,第一个要面对的灵魂拷问就是:这个协程在堆上分配的内存帧(Coroutine Frame),最后谁来删?
既然分离出去了,外部就不存在任何变量持有它的句柄。在 C++20 里,最优雅的解法是:配置好参数,让编译器自己管理。
按照协程规范,只要
promise_type::final_suspend()返回std::suspend_never,协程走到生命周期尽头时,运行时就会自动delete掉那块堆内存。据此,我们可以写出一个极简的“裸分离”壳子:// 纯粹的空壳,仅用于触发编译器的自动清理 struct DetachedTask { struct promise_type { auto get_return_object() noexcept { return detached{}; } // 饥饿启动:一创建就立马执行 auto initial_suspend() noexcept { return std::suspend_never{}; } // 核心:结束时不挂起,触发自动销毁 auto final_suspend() noexcept { return std::suspend_never{}; } void return_void() noexcept {} void unhandled_exception() noexcept { std::terminate(); } }; }; // 极简版原语 template<typename Awaitable> auto co_spawn(Awaitable awaitable) -> DetachedTask { co_await std::move(awaitable); }然后,我们在同步回调里调用
co_spawn(handle_client(std::move(client)));时,协程帧会立即投入运行。遇到 I/O 挂起时,控制流会return回主循环(不会阻塞线程!)。等任务彻底跑完,走向final_suspend,内存安全摧毁,干干净净。2. 状态追踪:用 RAII 告别“幽灵任务”
上面这套“裸分离”虽然在语言机制上跑得通,但在工程上其实是个定时炸弹。因为它没法做状态追踪,也就没法支持服务器的优雅停机(Graceful Shutdown)。
试想一下,如果你发个
SIGTERM准备关进程,底层的事件分发器怎么知道还有多少个co_spawn出去的任务在挂起等 I/O?如果直接把底层上下文销毁了,等这些“幽灵任务”被唤醒时,面对的就是一片废墟,当场 Core Dump。所以,分离出的协程必须和底层的上下文绑定生命周期:诞生时登记,死亡时注销。
这里我们假定一个context应该支持add_work()和drop_work()来管理分离出去的任务。// 假定上下文支持增减引用计数 template<typename T> concept tracking_context = requires(T& ctx) { ctx.add_work(); ctx.drop_work(); }; template<tracking_context Context> struct DetachedTask { struct promise_type { Context* context = nullptr; // 拦截参数:拿到上下文引用,生命周期开始时登记 template<typename Awaitable> promise_type(Context& ctx, Awaitable&&) : context{ &ctx } { context->add_work(); } // 绑定析构:随协程帧被编译器销毁时,自动注销 ~promise_type() { if (context) context->drop_work(); } auto get_return_object() noexcept { return DetachedTask{}; } auto initial_suspend() noexcept { return std::suspend_never{}; } auto final_suspend() noexcept { return std::suspend_never{}; } void return_void() noexcept {} void unhandled_exception() noexcept { std::terminate(); } }; };3. 最终的
co_spawn有了上面这个支持状态追踪的
detached_task,我们就可以给出co_spawn的最终接口了。// 注意这里的 Awaitable awaitable 是按值传递! template<tracking_context Context, awaitable Awaitable> requires std::movable<std::remove_cvref_t<Awaitable>> auto co_spawn(Context& ctx, Awaitable awaitable) -> DetachedTask<Context> { // 移动进协程帧里,生命周期交给DetachedTask co_await std::move(awaitable); }为什么要Awaitble必须按值传?
在协程里,如果参数是引用,堆上的协程帧就只会存个指针。像handle_client(std::move(client))这种调用,产生的是个临时对象(右值)。如果co_spawn接的是个引用,等它内部第一次co_await挂起、把控制权还给外层时,这个临时对象早就析构了!这会导致极其隐蔽的悬垂引用Bug。通过强制按值传递,我们用移动语义,把临时的业务任务移动到了协程帧内部,只要协程不死,它的状态就绝对安全。
4. 补充concept:到底什么是
awaitable?细心的朋友肯定注意到了,在最终的
co_spawn签名里,我用了一个awaitable的概念。在 C++20 里,一个东西能被
await,无非三种情况:- 它自己就是个
awaiter,即有3个await函数 - 它重载了成员方法
operator co_await() - 有对应的全局重载。
我们就把这个concept用代码翻译出来:
template<typename T> concept awaiter = requires(T& t, std::coroutine_handle<> handle) { { t.await_ready() } -> std::convertible_to<bool>; t.await_suspend(handle); t.await_resume(); }; template<typename T> concept has_operator_co_await = requires(T&& t) { { std::forward<T>(t).operator co_await() } -> awaiter; }; template<typename T> concept has_global_operator_co_await = requires(T&& t) { { operator co_await(std::forward<T>(t)) } -> awaiter; }; template<typename T> concept awaitable = awaiter<T> || has_operator_co_await<T> || has_global_operator_co_await<T>;
实战演示
最后,给一段伪代码示例,看看它是怎么在业务里落地的:
import std; // 1. 实现一个满足 tracking_context 契约的上下文 struct MyIOContext { int active_tasks = 0; void add_work() { ++active_tasks; std::cout << "[Context] 任务+1,当前活跃数: " << active_tasks << "\n"; } void drop_work() { --active_tasks; std::cout << "[Context] 任务结束,当前活跃数: " << active_tasks << "\n"; } void run_loop() { // 真实场景里,这里是 epoll_wait 或 io_uring_enter 阻塞等事件 std::cout << "[Context] 开启事件循环,等待 I/O...\n"; } }; // 2. 模拟一个能被 co_await 的异步操作 (满足 awaitable 契约) struct DummyAsyncRead { bool await_ready() { return false; } void await_suspend(std::coroutine_handle<>) { std::cout << " -> 协程挂起,把 fd 注册到 epoll...\n"; } void await_resume() { std::cout << " -> 协程恢复,拿到数据!\n"; } }; // 业务逻辑协程 DummyAsyncRead handle_client(int client_fd) { std::cout << "开始处理客户端: " << client_fd << "\n"; // 遇到 IO 挂起 co_await DummyAsyncRead{}; } // 3. 跑起来 int main() { MyIOContext ctx; std::cout << "--- 服务器启动 ---\n"; // 启动并分离,立刻返回 co_spawn(ctx, handle_client(1001)); co_spawn(ctx, handle_client(1002)); std::cout << "--- 同步的 main 函数丝毫不受阻塞 ---\n"; ctx.run_loop(); return 0; }一个更具体的demo可以看这里:demo
完整代码
template<typename T> concept awaiter = requires(T& t, std::coroutine_handle<> handle) { { t.await_ready() } -> std::convertible_to<bool>; t.await_suspend(handle); t.await_resume(); }; template<typename T> concept has_operator_co_await = requires(T&& t) { { std::forward<T>(t).operator co_await() } -> awaiter; }; template<typename T> concept has_global_operator_co_await = requires(T&& t) { { operator co_await(std::forward<T>(t)) } -> awaiter; }; template<typename T> concept awaitable = awaiter<T> || has_operator_co_await<T> || has_global_operator_co_await<T>; template<typename T> concept tracking_context = requires(T& ctx) { ctx.add_work(); ctx.drop_work(); }; template<tracking_context Context> struct DetachedTask { struct promise_type { Context* context = nullptr; template<typename Awaitable> promise_type(Context& ctx, Awaitable&& awaitable) : context{ &ctx } { context->add_work(); } ~promise_type() { context->drop_work(); } auto get_return_object() noexcept { return DetachedTask{}; } auto initial_suspend() noexcept { return std::suspend_never{}; } auto final_suspend() noexcept { return std::suspend_never{}; } void return_void() noexcept {} void unhandled_exception() noexcept { std::terminate(); } }; }; export template<tracking_context Context, awaitable Awaitable> requires std::movable<std::remove_cvref_t<Awaitable>> auto co_spawn(Context& ctx, Awaitable awaitable) -> DetachedTask<Context> { co_await std::move(awaitable); } - 它自己就是个