【 基于 io_uring 的 C++20 协程网络库】04 核心IO操作的协程实现
-
终于,我们的基础设施已经足以支撑起真正的网络交互,是时候着手实现
async_read、async_write以及async_accept等核心操作了。在本文中,我们会以一个相对简练的
Socket封装作为承载这些操作的起点。这显然不是它的最终形态(我们的最终目标是实现类似 Boost.Asio 的现代化 API 结构),但如果一上来就陷入对底层套接字选项、地址解析等细节的繁琐封装中,将严重偏离探讨协程并发模型的主线。因此,第一版的Socket仅是一个将底层系统调用简单捏合的产物。本文的焦点将完全聚集于底层的协程流转机制,以及我们如何通过确立公开的 Concept 契约,赋予网络库极其强大的组合与扩展能力。
1. 过渡期的 Socket 封装
第一版的
Socket核心职责是基于 RAII 管理文件描述符的生命周期,并利用 C++ 模板将底层的setsockopt等操作强类型化,消灭裸露的void*强转。在错误处理策略上,我们确立了一个清晰的工程边界:
- 初始化与配置阶段(如
socket创建、bind、listen、setsockopt):这些操作如果失败,通常意味着系统资源耗尽或配置存在致命错误,因此直接抛出异常(Throw Exception)。 - 网络 I/O 交互阶段(如
read、write、accept):在分布式系统中,超时、对端重置连接等属于常规的运行时分支,不应引发代价高昂的栈展开(Stack Unwinding)。因此,这部分操作必须返回std::expected<T, std::error_code>。
基础代码如下:
#include <sys/socket.h> #include <fcntl.h> #include <unistd.h> #include <utility> #include <span> template<typename Context> class Socket { public: using context_type = Context; using reuse_address = BooleanOption<SOL_SOCKET, SO_REUSEADDR>; #ifdef SO_REUSEPORT using reuse_port = BooleanOption<SOL_SOCKET, SO_REUSEPORT>; #endif using error = BooleanOption<SOL_SOCKET, SO_ERROR>; using receive_buffer_size = ValueOption<SOL_SOCKET, SO_RCVBUF>; using send_buffer_size = ValueOption<SOL_SOCKET, SO_SNDBUF>; using non_blocking = FlagOption<F_GETFL, F_SETFL, O_NONBLOCK>; using close_on_exec = FlagOption<F_GETFD, F_SETFD, FD_CLOEXEC>; Socket(Context& context, int domain, int type, int protocol) : context_{ context } { fd_ = ::socket(domain, type, protocol); if (fd_ == -1) throw_system_error("Failed to create socket"); } Socket(Context& context, int fd) : context_{ context }, fd_(fd) {} Socket(const Socket&) = delete; auto operator=(const Socket&) -> Socket& = delete; Socket(Socket&& other) noexcept : context_{ other.context_ }, fd_{ std::exchange(other.fd_, -1) } {} auto operator=(Socket&& other) noexcept -> Socket& { if (this != &other) { close(); context_ = other.context_; fd_ = std::exchange(other.fd_, -1); } return *this; } ~Socket() { close(); } template<socket_option Option> void option(const Option& value) { auto res = ::setsockopt(fd_, Option::level, Option::name, value.data(), value.size()); if (res == -1) throw_system_error("Failed to set socket option[{}]", Option::name); } template<socket_option Option> auto option() const -> Option { Option option{}; auto size = static_cast<socklen_t>(option.size()); auto res = ::getsockopt(fd_, Option::level, Option::name, option.data(), &size); if (res == -1) throw_system_error("Failed to get socket option[{}]", Option::name); if (size != option.size()) throw std::runtime_error{ "Unexpected socket option size" }; return option; } template<flag_option Option> void option(const Option& value) { auto current_flags = ::fcntl(fd_, Option::get_cmd); if (current_flags == -1) throw_system_error("Failed to get socket flags"); auto new_flags = value ? (current_flags | Option::bit) : (current_flags & ~Option::bit); if (::fcntl(fd_, Option::set_cmd, new_flags) == -1) throw_system_error("Failed to set socket flags"); } template<flag_option Option> auto option() const -> Option { auto current_flags = ::fcntl(fd_, Option::get_cmd); if (current_flags == -1) throw_system_error("Failed to get socket flags"); if (current_flags & Option::bit) return Option{ true }; return Option{ false }; } void close() { if (fd_ != -1) { ::close(fd_); fd_ = -1; } } constexpr auto native_handle() const -> int { return fd_; } void bind(const sockaddr* addr, socklen_t addrlen) { if (::bind(fd_, addr, addrlen) == -1) throw_system_error("Failed to bind socket"); } void listen(int backlog) { if (::listen(fd_, backlog) == -1) throw_system_error("Failed to listen on socket"); } // 前置声明的协程 I/O 操作接口 auto async_accept() -> AcceptAwaiter<Socket<Context>>; auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context>; auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context>; private: Context& context_; int fd_; };2. 核心 Awaiter 剖析
在实现底层 Awaiter 时,我们需要遵循上一节确立的
single_shot_only_operation概念约束,确保它们能够与TimeoutAwaiter无缝协作。2.1 ReadSomeAwaiter
在传统的 C 语言接口中,
read通常使用void* buffer配合size_t len。而在现代 C++ 中,我们应当使用视图类型std::span<std::byte>。这不仅明确了字节流的意图,更在编译期消除了指针越界的风险。更重要的是,由于协程挂起期间,Awaiter 对象驻留在独立的协程帧中,外部传入的
buffer视图会在整个异步操作期间保持有效。这从语言特性层面保证了内存安全,使得我们无需引入额外的引用计数机制。#ifndef BLOG_READSOME_AWAITER_H #define BLOG_READSOME_AWAITER_H #include <coroutine> #include <cstddef> #include <expected> #include <span> #include <system_error> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Context> class ReadSomeAwaiter : public Operation { public: using resume_type = std::size_t; ReadSomeAwaiter(Context& context, int fd, std::span<std::byte> buffer) : context_{ context }, fd_{ fd }, buffer_{ buffer } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return byte_read_; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_recv(sqe, fd_, buffer_.data(), buffer_.size(), 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) byte_read_ = static_cast<std::size_t>(result); else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> Context& { return context_; } private: Context& context_; int fd_; std::span<std::byte> buffer_; std::coroutine_handle<> handle_{ nullptr }; std::size_t byte_read_{ 0 }; int error_code_{ 0 }; }; #endif // BLOG_READSOME_AWAITER_H在
Socket中,我们只需将其作为返回值工厂抛出:auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context> { return ReadSomeAwaiter<Context>{ context_, fd_, buffer }; }2.2 WriteSomeAwaiter
写操作
WriteSomeAwaiter与读操作高度对称。唯一的区别是它接受只读视图std::span<const std::byte>,并将其底层操作码映射至io_uring_prep_send。#ifndef BLOG_WRITESOME_AWAITER_H #define BLOG_WRITESOME_AWAITER_H #include <coroutine> #include <cstddef> #include <expected> #include <span> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Context> class WriteSomeAwaiter : public Operation { public: using resume_type = std::size_t; WriteSomeAwaiter(Context& context, int fd, std::span<const std::byte> buffer) : context_{ context }, fd_{ fd }, buffer_{ buffer } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return byte_written_; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_send(sqe, fd_, buffer_.data(), buffer_.size(), 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) byte_written_ = static_cast<std::size_t>(result); else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> Context& { return context_; } private: std::coroutine_handle<> handle_{ nullptr }; Context& context_; int fd_; std::span<const std::byte> buffer_; std::size_t byte_written_{ 0 }; int error_code_{ 0 }; }; #endif // BLOG_WRITESOME_AWAITER_HSocket中的接口实现:auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context> { return WriteSomeAwaiter<Context>{ context_, fd_, buffer }; }2.3 AcceptAwaiter
AcceptAwaiter负责处理被动连接接入。在原生 C 接口中,accept可以通过传入sockaddr*来获取对端地址信息。由于当前版本的实现并未对地址(Endpoint)进行体系化封装,该版本的AcceptAwaiter暂不支持提取连接地址,而是直接移交内核产生的新fd,由其自动推导包装为新的Socket实例返回。#ifndef BLOG_ACCEPT_AWAITER_H #define BLOG_ACCEPT_AWAITER_H #include <coroutine> #include <expected> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Socket> class AcceptAwaiter : public Operation { public: using context_type = typename Socket::context_type; using socket_type = Socket; using resume_type = socket_type; AcceptAwaiter(context_type& context, int fd) : context_{ context }, fd_{ fd } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return resume_type{ context_, result_fd_ }; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_accept(sqe, fd_, nullptr, nullptr, 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) result_fd_ = result; else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> context_type& { return context_; } private: context_type& context_; int fd_; std::coroutine_handle<> handle_{ nullptr }; int result_fd_{ -1 }; int error_code_{ 0 }; }; #endif // BLOG_ACCEPT_AWAITER_H在
Socket中,挂载实现如下:auto async_accept() -> AcceptAwaiter<Socket<Context>> { return AcceptAwaiter<Socket<Context>>{ context_, fd_ }; }3. Echo Server 实战
有了这三大基础 I/O Awaiter,我们已经可以构建一个极简但具备并发特征的 Echo Server了。
在这个实现中,没有任何的回调嵌套,也没有跨线程的数据同步操作,控制流如同传统阻塞代码一般自上而下铺陈。
auto session(Socket<IOContext> client) -> Task<void> { std::array<std::byte, 1024> buffer{}; while (true) { auto read_result = co_await client.async_readsome(buffer); if (!read_result) { spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } auto bytes_read = *read_result; if (bytes_read == 0) { spdlog::info("Client {} disconnected", client.native_handle()); co_return; } spdlog::info("Read {} bytes from client {}", bytes_read, client.native_handle()); std::string_view data{ reinterpret_cast<const char*>(buffer.data()), bytes_read }; spdlog::warn("Data from client {}: {}", client.native_handle(), data); auto write_buffer = std::span{ buffer }.first(bytes_read); auto write_result = co_await client.async_writesome(write_buffer); if (!write_result) { spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message()); co_return; } if (*write_result != bytes_read) spdlog::warn("Partial write on client {}: {} / {} bytes", client.native_handle(), *write_result, bytes_read); } } auto server(IOContext& context) -> Task<void> { Socket acceptor{ context, AF_INET, SOCK_STREAM, 0 }; acceptor.option(Socket<IOContext>::reuse_address{ true }); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = ::htons(12345); acceptor.bind(reinterpret_cast<sockaddr*>(&addr), sizeof(addr)); acceptor.listen(1024); while (true) { auto client = co_await acceptor.async_accept(); if (!client) { spdlog::warn("Failed to accept connection: {}", client.error().message()); continue; } co_spawn(context, session(std::move(*client))); } } auto shutdown_monitor(IOContext& context) -> Task<void> { using namespace std::chrono_literals; SignalSet sets{ context, signals::interrupt, signals::terminate }; co_await sets.async_wait(); spdlog::info("Received shutdown signal, stopping IOContext..."); context.stop(); } int main(int argc, char* argv[]) { IOContext context{}; co_spawn(context, server(context)); co_spawn(context, shutdown_monitor(context)); context.run(); spdlog::info("IOContext stopped, exiting..."); return EXIT_SUCCESS; }编译执行后,利用多个客户端建立连接,输出日志如下:
blog.socket_v1 [2026-04-20 21:15:51.631] [info] Read 6 bytes from client 7 [2026-04-20 21:15:51.631] [warning] Data from client 7: dsfsa [2026-04-20 21:15:57.312] [info] Read 23 bytes from client 8 [2026-04-20 21:15:57.312] [warning] Data from client 8: fdasfas的撒大法师 [2026-04-20 21:16:01.863] [info] Client 8 disconnected [2026-04-20 21:16:03.021] [info] Client 7 disconnected [2026-04-20 21:16:09.837] [info] Read 36 bytes from client 7 [2026-04-20 21:16:09.837] [warning] Data from client 7: asfasdfsasa打撒四方达dsa去玩 [2026-04-20 21:16:11.133] [info] Client 7 disconnected ^C[2026-04-20 21:16:14.162] [info] Received shutdown signal, stopping IOContext... [2026-04-20 21:16:14.162] [info] IOContext stopped, exiting...更重要的是,得益于对实现中对
single_shot_only_operation的支持,我们可以无需修改底层ReadSomeAwaiter的任何一行代码,直接将上一篇博客中构建的TimeoutAwaiter无缝挂载。针对读写超时的正交组合可以写出如下形式:
// 控制 read 超时 auto read_result = co_await timeout(client.async_readsome(buffer), 5s); if (!read_result) { if (read_result.error() == std::errc::timed_out) { spdlog::debug("Read timed out on client {}", client.native_handle()); continue; } spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } // ... 处理接收逻辑 ... // 控制 write 超时 auto write_result = co_await timeout(client.async_writesome(write_buffer), 5s); if (!write_result) { if (write_result.error() == std::errc::timed_out) { spdlog::debug("Write timed out on client {}", client.native_handle()); continue; } spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message()); co_return; }基础骨架至此已完全跑通并形成闭环。在未来的优化中,我们将在这些底层基石之上,引入更完备的端点(Endpoint)表示与高级协议解析组件,让网络库从“可用”走向“现代化”。
- 初始化与配置阶段(如