终于,我们的基础设施已经足以支撑起真正的网络交互,是时候着手实现 async_read、async_write 以及 async_accept 等核心操作了。
在本文中,我们会以一个相对简练的 Socket 封装作为承载这些操作的起点。这显然不是它的最终形态(我们的最终目标是实现类似 Boost.Asio 的现代化 API 结构),但如果一上来就陷入对底层套接字选项、地址解析等细节的繁琐封装中,将严重偏离探讨协程并发模型的主线。因此,第一版的 Socket 仅是一个将底层系统调用简单捏合的产物。
本文的焦点将完全聚集于底层的协程流转机制,以及我们如何通过确立公开的 Concept 契约,赋予网络库极其强大的组合与扩展能力。
1. 过渡期的 Socket 封装
第一版的 Socket 核心职责是基于 RAII 管理文件描述符的生命周期,并利用 C++ 模板将底层的 setsockopt 等操作强类型化,消灭裸露的 void* 强转。
在错误处理策略上,我们确立了一个清晰的工程边界:
初始化与配置阶段(如 socket 创建、bind、listen、setsockopt):这些操作如果失败,通常意味着系统资源耗尽或配置存在致命错误,因此直接抛出异常(Throw Exception)。
网络 I/O 交互阶段(如 read、write、accept):在分布式系统中,超时、对端重置连接等属于常规的运行时分支,不应引发代价高昂的栈展开(Stack Unwinding)。因此,这部分操作必须返回 std::expected<T, std::error_code>。
基础代码如下:
#include <sys/socket.h>
#include <fcntl.h>
#include <unistd.h>
#include <utility>
#include <span>
template<typename Context>
class Socket {
public:
using context_type = Context;
using reuse_address = BooleanOption<SOL_SOCKET, SO_REUSEADDR>;
#ifdef SO_REUSEPORT
using reuse_port = BooleanOption<SOL_SOCKET, SO_REUSEPORT>;
#endif
using error = BooleanOption<SOL_SOCKET, SO_ERROR>;
using receive_buffer_size = ValueOption<SOL_SOCKET, SO_RCVBUF>;
using send_buffer_size = ValueOption<SOL_SOCKET, SO_SNDBUF>;
using non_blocking = FlagOption<F_GETFL, F_SETFL, O_NONBLOCK>;
using close_on_exec = FlagOption<F_GETFD, F_SETFD, FD_CLOEXEC>;
Socket(Context& context, int domain, int type, int protocol)
: context_{ context }
{
fd_ = ::socket(domain, type, protocol);
if (fd_ == -1)
throw_system_error("Failed to create socket");
}
Socket(Context& context, int fd)
: context_{ context },
fd_(fd)
{}
Socket(const Socket&) = delete;
auto operator=(const Socket&) -> Socket& = delete;
Socket(Socket&& other) noexcept
: context_{ other.context_ },
fd_{ std::exchange(other.fd_, -1) }
{}
auto operator=(Socket&& other) noexcept -> Socket&
{
if (this != &other) {
close();
context_ = other.context_;
fd_ = std::exchange(other.fd_, -1);
}
return *this;
}
~Socket()
{
close();
}
template<socket_option Option>
void option(const Option& value)
{
auto res = ::setsockopt(fd_, Option::level, Option::name, value.data(), value.size());
if (res == -1)
throw_system_error("Failed to set socket option[{}]", Option::name);
}
template<socket_option Option>
auto option() const -> Option
{
Option option{};
auto size = static_cast<socklen_t>(option.size());
auto res = ::getsockopt(fd_, Option::level, Option::name, option.data(), &size);
if (res == -1)
throw_system_error("Failed to get socket option[{}]", Option::name);
if (size != option.size())
throw std::runtime_error{ "Unexpected socket option size" };
return option;
}
template<flag_option Option>
void option(const Option& value)
{
auto current_flags = ::fcntl(fd_, Option::get_cmd);
if (current_flags == -1)
throw_system_error("Failed to get socket flags");
auto new_flags = value ? (current_flags | Option::bit) : (current_flags & ~Option::bit);
if (::fcntl(fd_, Option::set_cmd, new_flags) == -1)
throw_system_error("Failed to set socket flags");
}
template<flag_option Option>
auto option() const -> Option
{
auto current_flags = ::fcntl(fd_, Option::get_cmd);
if (current_flags == -1)
throw_system_error("Failed to get socket flags");
if (current_flags & Option::bit)
return Option{ true };
return Option{ false };
}
void close()
{
if (fd_ != -1) {
::close(fd_);
fd_ = -1;
}
}
constexpr auto native_handle() const -> int
{
return fd_;
}
void bind(const sockaddr* addr, socklen_t addrlen)
{
if (::bind(fd_, addr, addrlen) == -1)
throw_system_error("Failed to bind socket");
}
void listen(int backlog)
{
if (::listen(fd_, backlog) == -1)
throw_system_error("Failed to listen on socket");
}
// 前置声明的协程 I/O 操作接口
auto async_accept() -> AcceptAwaiter<Socket<Context>>;
auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context>;
auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context>;
private:
Context& context_;
int fd_;
};
2. 核心 Awaiter 剖析
在实现底层 Awaiter 时,我们需要遵循上一节确立的 single_shot_only_operation 概念约束,确保它们能够与 TimeoutAwaiter 无缝协作。
2.1 ReadSomeAwaiter
在传统的 C 语言接口中,read 通常使用 void* buffer 配合 size_t len。而在现代 C++ 中,我们应当使用视图类型 std::span<std::byte>。这不仅明确了字节流的意图,更在编译期消除了指针越界的风险。
更重要的是,由于协程挂起期间,Awaiter 对象驻留在独立的协程帧中,外部传入的 buffer 视图会在整个异步操作期间保持有效。这从语言特性层面保证了内存安全,使得我们无需引入额外的引用计数机制。
#ifndef BLOG_READSOME_AWAITER_H
#define BLOG_READSOME_AWAITER_H
#include <coroutine>
#include <cstddef>
#include <expected>
#include <span>
#include <system_error>
#include <utility>
#include <liburing.h>
#include "exceptions.h"
#include "operation.h"
template<typename Context>
class ReadSomeAwaiter : public Operation {
public:
using resume_type = std::size_t;
ReadSomeAwaiter(Context& context, int fd, std::span<std::byte> buffer)
: context_{ context }, fd_{ fd }, buffer_{ buffer }
{}
constexpr auto await_ready() const noexcept -> bool
{
return false;
}
void await_suspend(std::coroutine_handle<> handle) noexcept
{
handle_ = handle;
auto* sqe = context_.sqe();
prepare(sqe);
::io_uring_sqe_set_data(sqe, this);
}
auto await_resume() noexcept -> std::expected<resume_type, std::error_code>
{
if (error_code_ != 0)
return unexpected_system_error(error_code_);
return byte_read_;
}
void prepare(::io_uring_sqe* sqe) noexcept
{
::io_uring_prep_recv(sqe, fd_, buffer_.data(), buffer_.size(), 0);
}
void set_result(int result, std::uint32_t flags) noexcept
{
if (result >= 0)
byte_read_ = static_cast<std::size_t>(result);
else
error_code_ = -result;
}
void complete(int result, std::uint32_t flags) noexcept override
{
set_result(result, flags);
if (handle_) {
auto handle = std::exchange(handle_, nullptr);
handle.resume();
}
}
auto context() noexcept -> Context& { return context_; }
private:
Context& context_;
int fd_;
std::span<std::byte> buffer_;
std::coroutine_handle<> handle_{ nullptr };
std::size_t byte_read_{ 0 };
int error_code_{ 0 };
};
#endif // BLOG_READSOME_AWAITER_H
在 Socket 中,我们只需将其作为返回值工厂抛出:
auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context>
{
return ReadSomeAwaiter<Context>{ context_, fd_, buffer };
}
2.2 WriteSomeAwaiter
写操作 WriteSomeAwaiter 与读操作高度对称。唯一的区别是它接受只读视图 std::span<const std::byte>,并将其底层操作码映射至 io_uring_prep_send。
#ifndef BLOG_WRITESOME_AWAITER_H
#define BLOG_WRITESOME_AWAITER_H
#include <coroutine>
#include <cstddef>
#include <expected>
#include <span>
#include <utility>
#include <liburing.h>
#include "exceptions.h"
#include "operation.h"
template<typename Context>
class WriteSomeAwaiter : public Operation {
public:
using resume_type = std::size_t;
WriteSomeAwaiter(Context& context, int fd, std::span<const std::byte> buffer)
: context_{ context }, fd_{ fd }, buffer_{ buffer }
{}
constexpr auto await_ready() const noexcept -> bool
{
return false;
}
void await_suspend(std::coroutine_handle<> handle) noexcept
{
handle_ = handle;
auto* sqe = context_.sqe();
prepare(sqe);
::io_uring_sqe_set_data(sqe, this);
}
auto await_resume() noexcept -> std::expected<resume_type, std::error_code>
{
if (error_code_ != 0)
return unexpected_system_error(error_code_);
return byte_written_;
}
void prepare(::io_uring_sqe* sqe) noexcept
{
::io_uring_prep_send(sqe, fd_, buffer_.data(), buffer_.size(), 0);
}
void set_result(int result, std::uint32_t flags) noexcept
{
if (result >= 0)
byte_written_ = static_cast<std::size_t>(result);
else
error_code_ = -result;
}
void complete(int result, std::uint32_t flags) noexcept override
{
set_result(result, flags);
if (handle_) {
auto handle = std::exchange(handle_, nullptr);
handle.resume();
}
}
auto context() noexcept -> Context& { return context_; }
private:
std::coroutine_handle<> handle_{ nullptr };
Context& context_;
int fd_;
std::span<const std::byte> buffer_;
std::size_t byte_written_{ 0 };
int error_code_{ 0 };
};
#endif // BLOG_WRITESOME_AWAITER_H
Socket 中的接口实现:
auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context>
{
return WriteSomeAwaiter<Context>{ context_, fd_, buffer };
}
2.3 AcceptAwaiter
AcceptAwaiter 负责处理被动连接接入。在原生 C 接口中,accept 可以通过传入 sockaddr* 来获取对端地址信息。由于当前版本的实现并未对地址(Endpoint)进行体系化封装,该版本的 AcceptAwaiter 暂不支持提取连接地址,而是直接移交内核产生的新 fd,由其自动推导包装为新的 Socket 实例返回。
#ifndef BLOG_ACCEPT_AWAITER_H
#define BLOG_ACCEPT_AWAITER_H
#include <coroutine>
#include <expected>
#include <utility>
#include <liburing.h>
#include "exceptions.h"
#include "operation.h"
template<typename Socket>
class AcceptAwaiter : public Operation {
public:
using context_type = typename Socket::context_type;
using socket_type = Socket;
using resume_type = socket_type;
AcceptAwaiter(context_type& context, int fd)
: context_{ context }, fd_{ fd }
{}
constexpr auto await_ready() const noexcept -> bool
{
return false;
}
void await_suspend(std::coroutine_handle<> handle) noexcept
{
handle_ = handle;
auto* sqe = context_.sqe();
prepare(sqe);
::io_uring_sqe_set_data(sqe, this);
}
auto await_resume() noexcept -> std::expected<resume_type, std::error_code>
{
if (error_code_ != 0)
return unexpected_system_error(error_code_);
return resume_type{ context_, result_fd_ };
}
void prepare(::io_uring_sqe* sqe) noexcept
{
::io_uring_prep_accept(sqe, fd_, nullptr, nullptr, 0);
}
void set_result(int result, std::uint32_t flags) noexcept
{
if (result >= 0)
result_fd_ = result;
else
error_code_ = -result;
}
void complete(int result, std::uint32_t flags) noexcept override
{
set_result(result, flags);
if (handle_) {
auto handle = std::exchange(handle_, nullptr);
handle.resume();
}
}
auto context() noexcept -> context_type& { return context_; }
private:
context_type& context_;
int fd_;
std::coroutine_handle<> handle_{ nullptr };
int result_fd_{ -1 };
int error_code_{ 0 };
};
#endif // BLOG_ACCEPT_AWAITER_H
在 Socket 中,挂载实现如下:
auto async_accept() -> AcceptAwaiter<Socket<Context>>
{
return AcceptAwaiter<Socket<Context>>{ context_, fd_ };
}
3. Echo Server 实战
有了这三大基础 I/O Awaiter,我们已经可以构建一个极简但具备并发特征的 Echo Server了。
在这个实现中,没有任何的回调嵌套,也没有跨线程的数据同步操作,控制流如同传统阻塞代码一般自上而下铺陈。
auto session(Socket<IOContext> client) -> Task<void>
{
std::array<std::byte, 1024> buffer{};
while (true)
{
auto read_result = co_await client.async_readsome(buffer);
if (!read_result) {
spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message());
co_return;
}
auto bytes_read = *read_result;
if (bytes_read == 0) {
spdlog::info("Client {} disconnected", client.native_handle());
co_return;
}
spdlog::info("Read {} bytes from client {}", bytes_read, client.native_handle());
std::string_view data{ reinterpret_cast<const char*>(buffer.data()), bytes_read };
spdlog::warn("Data from client {}: {}", client.native_handle(), data);
auto write_buffer = std::span{ buffer }.first(bytes_read);
auto write_result = co_await client.async_writesome(write_buffer);
if (!write_result) {
spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message());
co_return;
}
if (*write_result != bytes_read)
spdlog::warn("Partial write on client {}: {} / {} bytes", client.native_handle(), *write_result, bytes_read);
}
}
auto server(IOContext& context) -> Task<void>
{
Socket acceptor{ context, AF_INET, SOCK_STREAM, 0 };
acceptor.option(Socket<IOContext>::reuse_address{ true });
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = ::htons(12345);
acceptor.bind(reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
acceptor.listen(1024);
while (true) {
auto client = co_await acceptor.async_accept();
if (!client) {
spdlog::warn("Failed to accept connection: {}", client.error().message());
continue;
}
co_spawn(context, session(std::move(*client)));
}
}
auto shutdown_monitor(IOContext& context) -> Task<void>
{
using namespace std::chrono_literals;
SignalSet sets{ context, signals::interrupt, signals::terminate };
co_await sets.async_wait();
spdlog::info("Received shutdown signal, stopping IOContext...");
context.stop();
}
int main(int argc, char* argv[])
{
IOContext context{};
co_spawn(context, server(context));
co_spawn(context, shutdown_monitor(context));
context.run();
spdlog::info("IOContext stopped, exiting...");
return EXIT_SUCCESS;
}
编译执行后,利用多个客户端建立连接,输出日志如下:
blog.socket_v1
[2026-04-20 21:15:51.631] [info] Read 6 bytes from client 7
[2026-04-20 21:15:51.631] [warning] Data from client 7: dsfsa
[2026-04-20 21:15:57.312] [info] Read 23 bytes from client 8
[2026-04-20 21:15:57.312] [warning] Data from client 8: fdasfas的撒大法师
[2026-04-20 21:16:01.863] [info] Client 8 disconnected
[2026-04-20 21:16:03.021] [info] Client 7 disconnected
[2026-04-20 21:16:09.837] [info] Read 36 bytes from client 7
[2026-04-20 21:16:09.837] [warning] Data from client 7: asfasdfsasa打撒四方达dsa去玩
[2026-04-20 21:16:11.133] [info] Client 7 disconnected
^C[2026-04-20 21:16:14.162] [info] Received shutdown signal, stopping IOContext...
[2026-04-20 21:16:14.162] [info] IOContext stopped, exiting...
更重要的是,得益于对实现中对 single_shot_only_operation 的支持,我们可以无需修改底层 ReadSomeAwaiter 的任何一行代码,直接将上一篇博客中构建的 TimeoutAwaiter 无缝挂载。
针对读写超时的正交组合可以写出如下形式:
// 控制 read 超时
auto read_result = co_await timeout(client.async_readsome(buffer), 5s);
if (!read_result) {
if (read_result.error() == std::errc::timed_out) {
spdlog::debug("Read timed out on client {}", client.native_handle());
continue;
}
spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message());
co_return;
}
// ... 处理接收逻辑 ...
// 控制 write 超时
auto write_result = co_await timeout(client.async_writesome(write_buffer), 5s);
if (!write_result) {
if (write_result.error() == std::errc::timed_out) {
spdlog::debug("Write timed out on client {}", client.native_handle());
continue;
}
spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message());
co_return;
}
基础骨架至此已完全跑通并形成闭环。在未来的优化中,我们将在这些底层基石之上,引入更完备的端点(Endpoint)表示与高级协议解析组件,让网络库从“可用”走向“现代化”。
完整代码