跳转至内容
  • 版块
  • 最新
  • 标签
  • 热门
  • Online Tools
  • 用户
  • 群组
折叠
品牌标识

D2Learn Forums

  1. 主页
  2. Blogs | 博客
  3. xin
  4. 【 基于 io_uring 的 C++20 协程网络库】04 核心IO操作的协程实现

【 基于 io_uring 的 C++20 协程网络库】04 核心IO操作的协程实现

已定时 已固定 已锁定 已移动 xin
c++20协程
1 帖子 1 发布者 1 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • DoomjustinD 离线
    DoomjustinD 离线
    Doomjustin
    编写于 最后由 Doomjustin 编辑
    #1

    终于,我们的基础设施已经足以支撑起真正的网络交互,是时候着手实现 async_read、async_write 以及 async_accept 等核心操作了。

    在本文中,我们会以一个相对简练的 Socket 封装作为承载这些操作的起点。这显然不是它的最终形态(我们的最终目标是实现类似 Boost.Asio 的现代化 API 结构),但如果一上来就陷入对底层套接字选项、地址解析等细节的繁琐封装中,将严重偏离探讨协程并发模型的主线。因此,第一版的 Socket 仅是一个将底层系统调用简单捏合的产物。

    本文的焦点将完全聚集于底层的协程流转机制,以及我们如何通过确立公开的 Concept 契约,赋予网络库极其强大的组合与扩展能力。


    1. 过渡期的 Socket 封装

    第一版的 Socket 核心职责是基于 RAII 管理文件描述符的生命周期,并利用 C++ 模板将底层的 setsockopt 等操作强类型化,消灭裸露的 void* 强转。

    在错误处理策略上,我们确立了一个清晰的工程边界:

    • 初始化与配置阶段(如 socket 创建、bind、listen、setsockopt):这些操作如果失败,通常意味着系统资源耗尽或配置存在致命错误,因此直接抛出异常(Throw Exception)。
    • 网络 I/O 交互阶段(如 read、write、accept):在分布式系统中,超时、对端重置连接等属于常规的运行时分支,不应引发代价高昂的栈展开(Stack Unwinding)。因此,这部分操作必须返回 std::expected<T, std::error_code>。

    基础代码如下:

    #include <sys/socket.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <utility>
    #include <span>
    
    template<typename Context>
    class Socket {
    public:
        using context_type = Context;
    
        using reuse_address = BooleanOption<SOL_SOCKET, SO_REUSEADDR>;
    #ifdef SO_REUSEPORT
        using reuse_port = BooleanOption<SOL_SOCKET, SO_REUSEPORT>;
    #endif
        using error = BooleanOption<SOL_SOCKET, SO_ERROR>;
        using receive_buffer_size = ValueOption<SOL_SOCKET, SO_RCVBUF>;
        using send_buffer_size = ValueOption<SOL_SOCKET, SO_SNDBUF>;
        using non_blocking = FlagOption<F_GETFL, F_SETFL, O_NONBLOCK>;
        using close_on_exec = FlagOption<F_GETFD, F_SETFD, FD_CLOEXEC>;
    
        Socket(Context& context, int domain, int type, int protocol)
          : context_{ context }
        {
            fd_ = ::socket(domain, type, protocol);
            if (fd_ == -1)
                throw_system_error("Failed to create socket");
        }
    
        Socket(Context& context, int fd)
          : context_{ context }, 
            fd_(fd) 
        {}
    
        Socket(const Socket&) = delete;
        auto operator=(const Socket&) -> Socket& = delete;
    
        Socket(Socket&& other) noexcept
          : context_{ other.context_ }, 
            fd_{ std::exchange(other.fd_, -1) }
        {}
    
        auto operator=(Socket&& other) noexcept -> Socket&
        {
            if (this != &other) {
                close();
                context_ = other.context_;
                fd_ = std::exchange(other.fd_, -1);
            }
            return *this;
        }
    
        ~Socket()
        {
            close();
        }
    
        template<socket_option Option>
        void option(const Option& value)
        {
            auto res = ::setsockopt(fd_, Option::level, Option::name, value.data(), value.size());
            if (res == -1)
                throw_system_error("Failed to set socket option[{}]", Option::name);
        }
    
        template<socket_option Option>
        auto option() const -> Option
        {
            Option option{};
            auto size = static_cast<socklen_t>(option.size());
            auto res = ::getsockopt(fd_, Option::level, Option::name, option.data(), &size);
            if (res == -1)
                throw_system_error("Failed to get socket option[{}]", Option::name);
    
            if (size != option.size())
                throw std::runtime_error{ "Unexpected socket option size" };
    
            return option;
        }
    
        template<flag_option Option>
        void option(const Option& value)
        {
            auto current_flags = ::fcntl(fd_, Option::get_cmd);
            if (current_flags == -1)
                throw_system_error("Failed to get socket flags");
    
            auto new_flags = value ? (current_flags | Option::bit) : (current_flags & ~Option::bit);
            if (::fcntl(fd_, Option::set_cmd, new_flags) == -1)
                throw_system_error("Failed to set socket flags");
        }
    
        template<flag_option Option>
        auto option() const -> Option
        {
            auto current_flags = ::fcntl(fd_, Option::get_cmd);
            if (current_flags == -1)
                throw_system_error("Failed to get socket flags");
    
            if (current_flags & Option::bit)
                return Option{ true };
            
            return Option{ false };
        }
    
        void close()
        {
            if (fd_ != -1) {
                ::close(fd_);
                fd_ = -1;
            }
        }
    
        constexpr auto native_handle() const -> int
        {
            return fd_;
        }
    
        void bind(const sockaddr* addr, socklen_t addrlen)
        {
            if (::bind(fd_, addr, addrlen) == -1)
                throw_system_error("Failed to bind socket");
        }
    
        void listen(int backlog)
        {
            if (::listen(fd_, backlog) == -1)
                throw_system_error("Failed to listen on socket");
        }
    
        // 前置声明的协程 I/O 操作接口
        auto async_accept() -> AcceptAwaiter<Socket<Context>>;
        auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context>;
        auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context>;
    
    private:
        Context& context_;
        int fd_;
    };
    

    2. 核心 Awaiter 剖析

    在实现底层 Awaiter 时,我们需要遵循上一节确立的 single_shot_only_operation 概念约束,确保它们能够与 TimeoutAwaiter 无缝协作。

    2.1 ReadSomeAwaiter

    在传统的 C 语言接口中,read 通常使用 void* buffer 配合 size_t len。而在现代 C++ 中,我们应当使用视图类型 std::span<std::byte>。这不仅明确了字节流的意图,更在编译期消除了指针越界的风险。

    更重要的是,由于协程挂起期间,Awaiter 对象驻留在独立的协程帧中,外部传入的 buffer 视图会在整个异步操作期间保持有效。这从语言特性层面保证了内存安全,使得我们无需引入额外的引用计数机制。

    #ifndef BLOG_READSOME_AWAITER_H
    #define BLOG_READSOME_AWAITER_H
    
    #include <coroutine>
    #include <cstddef>
    #include <expected>
    #include <span>
    #include <system_error>
    #include <utility>
    
    #include <liburing.h>
    
    #include "exceptions.h"
    #include "operation.h"
    
    template<typename Context>
    class ReadSomeAwaiter : public Operation {
    public:
        using resume_type = std::size_t;
    
        ReadSomeAwaiter(Context& context, int fd, std::span<std::byte> buffer)
          : context_{ context }, fd_{ fd }, buffer_{ buffer }
        {}
    
        constexpr auto await_ready() const noexcept -> bool
        {
            return false;
        }
    
        void await_suspend(std::coroutine_handle<> handle) noexcept
        {
            handle_ = handle;
    
            auto* sqe = context_.sqe();
    
            prepare(sqe);
            ::io_uring_sqe_set_data(sqe, this);
        }
    
        auto await_resume() noexcept -> std::expected<resume_type, std::error_code>
        {
            if (error_code_ != 0)
                return unexpected_system_error(error_code_);
    
            return byte_read_;
        }
    
        void prepare(::io_uring_sqe* sqe) noexcept
        {
            ::io_uring_prep_recv(sqe, fd_, buffer_.data(), buffer_.size(), 0);
        }
    
        void set_result(int result, std::uint32_t flags) noexcept
        {
            if (result >= 0)
                byte_read_ = static_cast<std::size_t>(result);
            else
                error_code_ = -result;
        }
    
        void complete(int result, std::uint32_t flags) noexcept override
        {
            set_result(result, flags);
    
            if (handle_) {
                auto handle = std::exchange(handle_, nullptr);
                handle.resume();
            }
        }
    
        auto context() noexcept -> Context& { return context_; }
    
    private:
        Context& context_;
        int fd_;
        std::span<std::byte> buffer_;
    
        std::coroutine_handle<> handle_{ nullptr };
        std::size_t byte_read_{ 0 };
        int error_code_{ 0 };
    };
    
    #endif // BLOG_READSOME_AWAITER_H
    

    在 Socket 中,我们只需将其作为返回值工厂抛出:

    auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context>
    {
        return ReadSomeAwaiter<Context>{ context_, fd_, buffer };
    }
    

    2.2 WriteSomeAwaiter

    写操作 WriteSomeAwaiter 与读操作高度对称。唯一的区别是它接受只读视图 std::span<const std::byte>,并将其底层操作码映射至 io_uring_prep_send。

    #ifndef BLOG_WRITESOME_AWAITER_H
    #define BLOG_WRITESOME_AWAITER_H
    
    #include <coroutine>
    #include <cstddef>
    #include <expected>
    #include <span>
    #include <utility>
    
    #include <liburing.h>
    
    #include "exceptions.h"
    #include "operation.h"
    
    template<typename Context>
    class WriteSomeAwaiter : public Operation {
    public:
        using resume_type = std::size_t;
    
        WriteSomeAwaiter(Context& context, int fd, std::span<const std::byte> buffer)
          : context_{ context }, fd_{ fd }, buffer_{ buffer }
        {}
    
        constexpr auto await_ready() const noexcept -> bool
        {
            return false;
        }
    
        void await_suspend(std::coroutine_handle<> handle) noexcept
        {
            handle_ = handle;
    
            auto* sqe = context_.sqe();
            
            prepare(sqe);
            ::io_uring_sqe_set_data(sqe, this);
        }
    
        auto await_resume() noexcept -> std::expected<resume_type, std::error_code>
        {
            if (error_code_ != 0)
                return unexpected_system_error(error_code_);
    
            return byte_written_;
        }
    
        void prepare(::io_uring_sqe* sqe) noexcept
        {
            ::io_uring_prep_send(sqe, fd_, buffer_.data(), buffer_.size(), 0);
        }
    
        void set_result(int result, std::uint32_t flags) noexcept
        {
            if (result >= 0)
                byte_written_ = static_cast<std::size_t>(result);
            else
                error_code_ = -result;
        }
    
        void complete(int result, std::uint32_t flags) noexcept override
        {
            set_result(result, flags);
    
            if (handle_) {
                auto handle = std::exchange(handle_, nullptr);
                handle.resume();
            }
        }
    
        auto context() noexcept -> Context& { return context_; }
    
    private:
        std::coroutine_handle<> handle_{ nullptr };
        Context& context_;
        int fd_;
        std::span<const std::byte> buffer_;
        std::size_t byte_written_{ 0 };
        int error_code_{ 0 };
    };
    
    #endif // BLOG_WRITESOME_AWAITER_H
    

    Socket 中的接口实现:

    auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context>
    {
        return WriteSomeAwaiter<Context>{ context_, fd_, buffer };
    }
    

    2.3 AcceptAwaiter

    AcceptAwaiter 负责处理被动连接接入。在原生 C 接口中,accept 可以通过传入 sockaddr* 来获取对端地址信息。由于当前版本的实现并未对地址(Endpoint)进行体系化封装,该版本的 AcceptAwaiter 暂不支持提取连接地址,而是直接移交内核产生的新 fd,由其自动推导包装为新的 Socket 实例返回。

    #ifndef BLOG_ACCEPT_AWAITER_H
    #define BLOG_ACCEPT_AWAITER_H
    
    #include <coroutine>
    #include <expected>
    #include <utility>
    
    #include <liburing.h>
    
    #include "exceptions.h"
    #include "operation.h"
    
    template<typename Socket>
    class AcceptAwaiter : public Operation {
    public:
        using context_type = typename Socket::context_type;
        using socket_type = Socket;
        using resume_type = socket_type;
    
        AcceptAwaiter(context_type& context, int fd)
          : context_{ context }, fd_{ fd }
        {} 
    
        constexpr auto await_ready() const noexcept -> bool
        {
            return false;
        }
    
        void await_suspend(std::coroutine_handle<> handle) noexcept
        {
            handle_ = handle;
    
            auto* sqe = context_.sqe();
            
            prepare(sqe);
            ::io_uring_sqe_set_data(sqe, this);
        }
    
        auto await_resume() noexcept -> std::expected<resume_type, std::error_code>
        {
            if (error_code_ != 0)
                return unexpected_system_error(error_code_);
    
            return resume_type{ context_, result_fd_ };
        }
    
        void prepare(::io_uring_sqe* sqe) noexcept
        {
            ::io_uring_prep_accept(sqe, fd_, nullptr, nullptr, 0);
        }
    
        void set_result(int result, std::uint32_t flags) noexcept
        {
            if (result >= 0)
                result_fd_ = result;
            else
                error_code_ = -result;
        }
    
        void complete(int result, std::uint32_t flags) noexcept override
        {
            set_result(result, flags);
    
            if (handle_) {
                auto handle = std::exchange(handle_, nullptr);
                handle.resume();
            }
        }
    
        auto context() noexcept -> context_type& { return context_; }
    
    private:
        context_type& context_;
        int fd_;
    
        std::coroutine_handle<> handle_{ nullptr };
        int result_fd_{ -1 };
        int error_code_{ 0 };
    };
    
    #endif // BLOG_ACCEPT_AWAITER_H
    

    在 Socket 中,挂载实现如下:

    auto async_accept() -> AcceptAwaiter<Socket<Context>>
    {
        return AcceptAwaiter<Socket<Context>>{ context_, fd_ };
    }
    

    3. Echo Server 实战

    有了这三大基础 I/O Awaiter,我们已经可以构建一个极简但具备并发特征的 Echo Server了。

    在这个实现中,没有任何的回调嵌套,也没有跨线程的数据同步操作,控制流如同传统阻塞代码一般自上而下铺陈。

    auto session(Socket<IOContext> client) -> Task<void>
    {
        std::array<std::byte, 1024> buffer{};
    
        while (true)
        {
            auto read_result = co_await client.async_readsome(buffer);
            if (!read_result) {
                spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message());
                co_return;
            }
    
            auto bytes_read = *read_result;
            if (bytes_read == 0) {
                spdlog::info("Client {} disconnected", client.native_handle());
                co_return;
            }
    
            spdlog::info("Read {} bytes from client {}", bytes_read, client.native_handle());
            std::string_view data{ reinterpret_cast<const char*>(buffer.data()), bytes_read };
            spdlog::warn("Data from client {}: {}", client.native_handle(), data);
    
            auto write_buffer = std::span{ buffer }.first(bytes_read);
            auto write_result = co_await client.async_writesome(write_buffer);
            
            if (!write_result) {
                spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message());
                co_return;
            }
    
            if (*write_result != bytes_read)
                spdlog::warn("Partial write on client {}: {} / {} bytes", client.native_handle(), *write_result, bytes_read);
        }
    }
    
    auto server(IOContext& context) -> Task<void>
    {
        Socket acceptor{ context, AF_INET, SOCK_STREAM, 0 };
        acceptor.option(Socket<IOContext>::reuse_address{ true });
    
        sockaddr_in addr{};
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
        addr.sin_port = ::htons(12345);
        
        acceptor.bind(reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
        acceptor.listen(1024);
    
        while (true) {
            auto client = co_await acceptor.async_accept();
            if (!client) {
                spdlog::warn("Failed to accept connection: {}", client.error().message());
                continue;
            }
    
            co_spawn(context, session(std::move(*client)));
        }
    }
    
    auto shutdown_monitor(IOContext& context) -> Task<void>
    {
        using namespace std::chrono_literals;
    
        SignalSet sets{ context, signals::interrupt, signals::terminate };
        co_await sets.async_wait();
    
        spdlog::info("Received shutdown signal, stopping IOContext...");
        context.stop();
    }
    
    int main(int argc, char* argv[])
    {
        IOContext context{};
    
        co_spawn(context, server(context));
        co_spawn(context, shutdown_monitor(context));
    
        context.run();
    
        spdlog::info("IOContext stopped, exiting...");
        return EXIT_SUCCESS;
    }
    

    编译执行后,利用多个客户端建立连接,输出日志如下:

    blog.socket_v1
    [2026-04-20 21:15:51.631] [info] Read 6 bytes from client 7
    [2026-04-20 21:15:51.631] [warning] Data from client 7: dsfsa
    
    [2026-04-20 21:15:57.312] [info] Read 23 bytes from client 8
    [2026-04-20 21:15:57.312] [warning] Data from client 8: fdasfas的撒大法师
    
    [2026-04-20 21:16:01.863] [info] Client 8 disconnected
    [2026-04-20 21:16:03.021] [info] Client 7 disconnected
    [2026-04-20 21:16:09.837] [info] Read 36 bytes from client 7
    [2026-04-20 21:16:09.837] [warning] Data from client 7: asfasdfsasa打撒四方达dsa去玩
    
    [2026-04-20 21:16:11.133] [info] Client 7 disconnected
    ^C[2026-04-20 21:16:14.162] [info] Received shutdown signal, stopping IOContext...
    [2026-04-20 21:16:14.162] [info] IOContext stopped, exiting...
    

    更重要的是,得益于对实现中对 single_shot_only_operation 的支持,我们可以无需修改底层 ReadSomeAwaiter 的任何一行代码,直接将上一篇博客中构建的 TimeoutAwaiter 无缝挂载。

    针对读写超时的正交组合可以写出如下形式:

    // 控制 read 超时
    auto read_result = co_await timeout(client.async_readsome(buffer), 5s);
    if (!read_result) {
        if (read_result.error() == std::errc::timed_out) {
            spdlog::debug("Read timed out on client {}", client.native_handle());
            continue;
        }
    
        spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message());
        co_return;
    }
    
    // ... 处理接收逻辑 ...
    
    // 控制 write 超时
    auto write_result = co_await timeout(client.async_writesome(write_buffer), 5s);
    if (!write_result) {
        if (write_result.error() == std::errc::timed_out) {
            spdlog::debug("Write timed out on client {}", client.native_handle());
            continue;
        }
    
        spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message());
        co_return;
    }
    

    基础骨架至此已完全跑通并形成闭环。在未来的优化中,我们将在这些底层基石之上,引入更完备的端点(Endpoint)表示与高级协议解析组件,让网络库从“可用”走向“现代化”。

    完整代码

    1 条回复 最后回复
    0

    • 登录

    • 没有帐号? 注册

    • 登录或注册以进行搜索。
    d2learn forums Powered by NodeBB
    • 第一个帖子
      最后一个帖子
    0
    • 版块
    • 最新
    • 标签
    • 热门
    • Online Tools
    • 用户
    • 群组