跳转至内容
  • A place to talk about whatever you want

    24 主题
    165 帖子
    semmyenatorS

    AREEM,一種無需黑箱的高精度π計算方法
    https://deepwiki.com/semmyenator/AREEM
    這個項目只是一個人完成的小型數學項目,並非一項發明或特殊功能設計。
    希望它對需要旋轉控制的工程師有所幫助。

  • 7 主题
    20 帖子
    SPeakS

    @dustchens 链表结构损坏, 不闭环了 (如果问题解决可以把帖子状态设置为已解决

  • 开源软件 | 开源社区 | 开源理念 | 开源与商业 | 开源可持续发展 等相关话的交流讨论
    注: 这里的"开源"是泛化的共建共享概念, 范围包含 OSI的范围、自由软件、CC等相关内容

    57 主题
    246 帖子
    MoYingJiM

    补一个 0BSD,这个许可证很多时候也被放在与 Unlicense 和 WTFPL 相提并论的(都是公共领域)

  • 56 主题
    250 帖子
    M

    lijin4884@gmail.com

  • 28 主题
    64 帖子
    DoomjustinD

    终于,我们的基础设施已经足以支撑起真正的网络交互,是时候着手实现 async_read、async_write 以及 async_accept 等核心操作了。

    在本文中,我们会以一个相对简练的 Socket 封装作为承载这些操作的起点。这显然不是它的最终形态(我们的最终目标是实现类似 Boost.Asio 的现代化 API 结构),但如果一上来就陷入对底层套接字选项、地址解析等细节的繁琐封装中,将严重偏离探讨协程并发模型的主线。因此,第一版的 Socket 仅是一个将底层系统调用简单捏合的产物。

    本文的焦点将完全聚集于底层的协程流转机制,以及我们如何通过确立公开的 Concept 契约,赋予网络库极其强大的组合与扩展能力。

    1. 过渡期的 Socket 封装

    第一版的 Socket 核心职责是基于 RAII 管理文件描述符的生命周期,并利用 C++ 模板将底层的 setsockopt 等操作强类型化,消灭裸露的 void* 强转。

    在错误处理策略上,我们确立了一个清晰的工程边界:

    初始化与配置阶段(如 socket 创建、bind、listen、setsockopt):这些操作如果失败,通常意味着系统资源耗尽或配置存在致命错误,因此直接抛出异常(Throw Exception)。 网络 I/O 交互阶段(如 read、write、accept):在分布式系统中,超时、对端重置连接等属于常规的运行时分支,不应引发代价高昂的栈展开(Stack Unwinding)。因此,这部分操作必须返回 std::expected<T, std::error_code>。

    基础代码如下:

    #include <sys/socket.h> #include <fcntl.h> #include <unistd.h> #include <utility> #include <span> template<typename Context> class Socket { public: using context_type = Context; using reuse_address = BooleanOption<SOL_SOCKET, SO_REUSEADDR>; #ifdef SO_REUSEPORT using reuse_port = BooleanOption<SOL_SOCKET, SO_REUSEPORT>; #endif using error = BooleanOption<SOL_SOCKET, SO_ERROR>; using receive_buffer_size = ValueOption<SOL_SOCKET, SO_RCVBUF>; using send_buffer_size = ValueOption<SOL_SOCKET, SO_SNDBUF>; using non_blocking = FlagOption<F_GETFL, F_SETFL, O_NONBLOCK>; using close_on_exec = FlagOption<F_GETFD, F_SETFD, FD_CLOEXEC>; Socket(Context& context, int domain, int type, int protocol) : context_{ context } { fd_ = ::socket(domain, type, protocol); if (fd_ == -1) throw_system_error("Failed to create socket"); } Socket(Context& context, int fd) : context_{ context }, fd_(fd) {} Socket(const Socket&) = delete; auto operator=(const Socket&) -> Socket& = delete; Socket(Socket&& other) noexcept : context_{ other.context_ }, fd_{ std::exchange(other.fd_, -1) } {} auto operator=(Socket&& other) noexcept -> Socket& { if (this != &other) { close(); context_ = other.context_; fd_ = std::exchange(other.fd_, -1); } return *this; } ~Socket() { close(); } template<socket_option Option> void option(const Option& value) { auto res = ::setsockopt(fd_, Option::level, Option::name, value.data(), value.size()); if (res == -1) throw_system_error("Failed to set socket option[{}]", Option::name); } template<socket_option Option> auto option() const -> Option { Option option{}; auto size = static_cast<socklen_t>(option.size()); auto res = ::getsockopt(fd_, Option::level, Option::name, option.data(), &size); if (res == -1) throw_system_error("Failed to get socket option[{}]", Option::name); if (size != option.size()) throw std::runtime_error{ "Unexpected socket option size" }; return option; } template<flag_option Option> void option(const Option& value) { auto current_flags = ::fcntl(fd_, Option::get_cmd); if (current_flags == -1) throw_system_error("Failed to get socket flags"); auto new_flags = value ? (current_flags | Option::bit) : (current_flags & ~Option::bit); if (::fcntl(fd_, Option::set_cmd, new_flags) == -1) throw_system_error("Failed to set socket flags"); } template<flag_option Option> auto option() const -> Option { auto current_flags = ::fcntl(fd_, Option::get_cmd); if (current_flags == -1) throw_system_error("Failed to get socket flags"); if (current_flags & Option::bit) return Option{ true }; return Option{ false }; } void close() { if (fd_ != -1) { ::close(fd_); fd_ = -1; } } constexpr auto native_handle() const -> int { return fd_; } void bind(const sockaddr* addr, socklen_t addrlen) { if (::bind(fd_, addr, addrlen) == -1) throw_system_error("Failed to bind socket"); } void listen(int backlog) { if (::listen(fd_, backlog) == -1) throw_system_error("Failed to listen on socket"); } // 前置声明的协程 I/O 操作接口 auto async_accept() -> AcceptAwaiter<Socket<Context>>; auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context>; auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context>; private: Context& context_; int fd_; }; 2. 核心 Awaiter 剖析

    在实现底层 Awaiter 时,我们需要遵循上一节确立的 single_shot_only_operation 概念约束,确保它们能够与 TimeoutAwaiter 无缝协作。

    2.1 ReadSomeAwaiter

    在传统的 C 语言接口中,read 通常使用 void* buffer 配合 size_t len。而在现代 C++ 中,我们应当使用视图类型 std::span<std::byte>。这不仅明确了字节流的意图,更在编译期消除了指针越界的风险。

    更重要的是,由于协程挂起期间,Awaiter 对象驻留在独立的协程帧中,外部传入的 buffer 视图会在整个异步操作期间保持有效。这从语言特性层面保证了内存安全,使得我们无需引入额外的引用计数机制。

    #ifndef BLOG_READSOME_AWAITER_H #define BLOG_READSOME_AWAITER_H #include <coroutine> #include <cstddef> #include <expected> #include <span> #include <system_error> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Context> class ReadSomeAwaiter : public Operation { public: using resume_type = std::size_t; ReadSomeAwaiter(Context& context, int fd, std::span<std::byte> buffer) : context_{ context }, fd_{ fd }, buffer_{ buffer } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return byte_read_; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_recv(sqe, fd_, buffer_.data(), buffer_.size(), 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) byte_read_ = static_cast<std::size_t>(result); else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> Context& { return context_; } private: Context& context_; int fd_; std::span<std::byte> buffer_; std::coroutine_handle<> handle_{ nullptr }; std::size_t byte_read_{ 0 }; int error_code_{ 0 }; }; #endif // BLOG_READSOME_AWAITER_H

    在 Socket 中,我们只需将其作为返回值工厂抛出:

    auto async_readsome(std::span<std::byte> buffer) -> ReadSomeAwaiter<Context> { return ReadSomeAwaiter<Context>{ context_, fd_, buffer }; } 2.2 WriteSomeAwaiter

    写操作 WriteSomeAwaiter 与读操作高度对称。唯一的区别是它接受只读视图 std::span<const std::byte>,并将其底层操作码映射至 io_uring_prep_send。

    #ifndef BLOG_WRITESOME_AWAITER_H #define BLOG_WRITESOME_AWAITER_H #include <coroutine> #include <cstddef> #include <expected> #include <span> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Context> class WriteSomeAwaiter : public Operation { public: using resume_type = std::size_t; WriteSomeAwaiter(Context& context, int fd, std::span<const std::byte> buffer) : context_{ context }, fd_{ fd }, buffer_{ buffer } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return byte_written_; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_send(sqe, fd_, buffer_.data(), buffer_.size(), 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) byte_written_ = static_cast<std::size_t>(result); else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> Context& { return context_; } private: std::coroutine_handle<> handle_{ nullptr }; Context& context_; int fd_; std::span<const std::byte> buffer_; std::size_t byte_written_{ 0 }; int error_code_{ 0 }; }; #endif // BLOG_WRITESOME_AWAITER_H

    Socket 中的接口实现:

    auto async_writesome(std::span<const std::byte> buffer) -> WriteSomeAwaiter<Context> { return WriteSomeAwaiter<Context>{ context_, fd_, buffer }; } 2.3 AcceptAwaiter

    AcceptAwaiter 负责处理被动连接接入。在原生 C 接口中,accept 可以通过传入 sockaddr* 来获取对端地址信息。由于当前版本的实现并未对地址(Endpoint)进行体系化封装,该版本的 AcceptAwaiter 暂不支持提取连接地址,而是直接移交内核产生的新 fd,由其自动推导包装为新的 Socket 实例返回。

    #ifndef BLOG_ACCEPT_AWAITER_H #define BLOG_ACCEPT_AWAITER_H #include <coroutine> #include <expected> #include <utility> #include <liburing.h> #include "exceptions.h" #include "operation.h" template<typename Socket> class AcceptAwaiter : public Operation { public: using context_type = typename Socket::context_type; using socket_type = Socket; using resume_type = socket_type; AcceptAwaiter(context_type& context, int fd) : context_{ context }, fd_{ fd } {} constexpr auto await_ready() const noexcept -> bool { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { handle_ = handle; auto* sqe = context_.sqe(); prepare(sqe); ::io_uring_sqe_set_data(sqe, this); } auto await_resume() noexcept -> std::expected<resume_type, std::error_code> { if (error_code_ != 0) return unexpected_system_error(error_code_); return resume_type{ context_, result_fd_ }; } void prepare(::io_uring_sqe* sqe) noexcept { ::io_uring_prep_accept(sqe, fd_, nullptr, nullptr, 0); } void set_result(int result, std::uint32_t flags) noexcept { if (result >= 0) result_fd_ = result; else error_code_ = -result; } void complete(int result, std::uint32_t flags) noexcept override { set_result(result, flags); if (handle_) { auto handle = std::exchange(handle_, nullptr); handle.resume(); } } auto context() noexcept -> context_type& { return context_; } private: context_type& context_; int fd_; std::coroutine_handle<> handle_{ nullptr }; int result_fd_{ -1 }; int error_code_{ 0 }; }; #endif // BLOG_ACCEPT_AWAITER_H

    在 Socket 中,挂载实现如下:

    auto async_accept() -> AcceptAwaiter<Socket<Context>> { return AcceptAwaiter<Socket<Context>>{ context_, fd_ }; } 3. Echo Server 实战

    有了这三大基础 I/O Awaiter,我们已经可以构建一个极简但具备并发特征的 Echo Server了。

    在这个实现中,没有任何的回调嵌套,也没有跨线程的数据同步操作,控制流如同传统阻塞代码一般自上而下铺陈。

    auto session(Socket<IOContext> client) -> Task<void> { std::array<std::byte, 1024> buffer{}; while (true) { auto read_result = co_await client.async_readsome(buffer); if (!read_result) { spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } auto bytes_read = *read_result; if (bytes_read == 0) { spdlog::info("Client {} disconnected", client.native_handle()); co_return; } spdlog::info("Read {} bytes from client {}", bytes_read, client.native_handle()); std::string_view data{ reinterpret_cast<const char*>(buffer.data()), bytes_read }; spdlog::warn("Data from client {}: {}", client.native_handle(), data); auto write_buffer = std::span{ buffer }.first(bytes_read); auto write_result = co_await client.async_writesome(write_buffer); if (!write_result) { spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message()); co_return; } if (*write_result != bytes_read) spdlog::warn("Partial write on client {}: {} / {} bytes", client.native_handle(), *write_result, bytes_read); } } auto server(IOContext& context) -> Task<void> { Socket acceptor{ context, AF_INET, SOCK_STREAM, 0 }; acceptor.option(Socket<IOContext>::reuse_address{ true }); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = ::htons(12345); acceptor.bind(reinterpret_cast<sockaddr*>(&addr), sizeof(addr)); acceptor.listen(1024); while (true) { auto client = co_await acceptor.async_accept(); if (!client) { spdlog::warn("Failed to accept connection: {}", client.error().message()); continue; } co_spawn(context, session(std::move(*client))); } } auto shutdown_monitor(IOContext& context) -> Task<void> { using namespace std::chrono_literals; SignalSet sets{ context, signals::interrupt, signals::terminate }; co_await sets.async_wait(); spdlog::info("Received shutdown signal, stopping IOContext..."); context.stop(); } int main(int argc, char* argv[]) { IOContext context{}; co_spawn(context, server(context)); co_spawn(context, shutdown_monitor(context)); context.run(); spdlog::info("IOContext stopped, exiting..."); return EXIT_SUCCESS; }

    编译执行后,利用多个客户端建立连接,输出日志如下:

    blog.socket_v1 [2026-04-20 21:15:51.631] [info] Read 6 bytes from client 7 [2026-04-20 21:15:51.631] [warning] Data from client 7: dsfsa [2026-04-20 21:15:57.312] [info] Read 23 bytes from client 8 [2026-04-20 21:15:57.312] [warning] Data from client 8: fdasfas的撒大法师 [2026-04-20 21:16:01.863] [info] Client 8 disconnected [2026-04-20 21:16:03.021] [info] Client 7 disconnected [2026-04-20 21:16:09.837] [info] Read 36 bytes from client 7 [2026-04-20 21:16:09.837] [warning] Data from client 7: asfasdfsasa打撒四方达dsa去玩 [2026-04-20 21:16:11.133] [info] Client 7 disconnected ^C[2026-04-20 21:16:14.162] [info] Received shutdown signal, stopping IOContext... [2026-04-20 21:16:14.162] [info] IOContext stopped, exiting...

    更重要的是,得益于对实现中对 single_shot_only_operation 的支持,我们可以无需修改底层 ReadSomeAwaiter 的任何一行代码,直接将上一篇博客中构建的 TimeoutAwaiter 无缝挂载。

    针对读写超时的正交组合可以写出如下形式:

    // 控制 read 超时 auto read_result = co_await timeout(client.async_readsome(buffer), 5s); if (!read_result) { if (read_result.error() == std::errc::timed_out) { spdlog::debug("Read timed out on client {}", client.native_handle()); continue; } spdlog::warn("Failed to read from client {}: {}", client.native_handle(), read_result.error().message()); co_return; } // ... 处理接收逻辑 ... // 控制 write 超时 auto write_result = co_await timeout(client.async_writesome(write_buffer), 5s); if (!write_result) { if (write_result.error() == std::errc::timed_out) { spdlog::debug("Write timed out on client {}", client.native_handle()); continue; } spdlog::warn("Failed to write to client {}: {}", client.native_handle(), write_result.error().message()); co_return; }

    基础骨架至此已完全跑通并形成闭环。在未来的优化中,我们将在这些底层基石之上,引入更完备的端点(Endpoint)表示与高级协议解析组件,让网络库从“可用”走向“现代化”。

    完整代码

  • 一个技术知识分享、学习、交流的社区

    15 主题
    50 帖子
    sunrisepeakS

    @Doomjustin 版块已创建, 可以检查确认一下是否有话题贴/Topic工具的权限

    https://forum.d2learn.org/category/26/xin
  • Got a question? Ask away!

    4 主题
    14 帖子
    SPeakS

    备注一下使数学公式的使用语法

    单行公式语法 - $ 你的公式 $

    $ log_2^n $

    $ log_2^n $

    多行公式语法 - $$ 你的公式 $$

    $$ log_2^n => log_2^9 = 3 , n = 9 $$

    $$
    log_2^n =>
    log_2^9 = 3, n = 9
    $$

公告栏 | Bulletin Board

欢迎加入d2learn社区 - 社区指南
Welcome to the d2learn Community - Community Guide

一个以 [知识、技术、代码、项目、想法、开源] 相关话题为主导的社区
A community focused on topics related to [knowledge, technology, code, projects, ideas, and open source].


在线用户