Home 实现一个短至200行的io_uring协程
Post
Cancel

实现一个短至200行的io_uring协程

简单的造点示例级轮子,规格上分为普通版和pro版:

  • 普通版提供200行内的C++20协程、io_uring后端,以及类似Asio的使用方式(co_spawnco_await initiating_functionio_context);
  • pro版在此基础上添加了io_uring的高级特性使用,以及多线程的协程上下文切换支持。

既然不算完整的库,基本介绍就这样了。下面不再废话,直接看代码。

快速过目

// READ -> WRITE -> [CLOSE]
Task echo(io_uring *uring, int client_fd) {
    char buf[4096];
    for(;;) {
        auto n = co_await async_read(uring, client_fd, buf, std::size(buf)) | nofail("read");

        auto printer = std::ostream_iterator<char>{std::cout};
        std::ranges::copy_n(buf, n, printer);

        n = co_await async_write(uring, client_fd, buf, n) | nofail("write");

        bool close_proactive = n > 2 && buf[0] == 'Z' && buf[1] == 'z';
        bool close_reactive = (n == 0);
        if(close_reactive || close_proactive) {
            co_await async_close(uring, client_fd);
            break;
        }
    }
}

// ACCEPT -> ACCEPT
//        -> ECHO
Task server(io_uring *uring, Io_context &io_context, int server_fd) {
    for(;;) {
        auto client_fd = co_await async_accept(uring, server_fd) | nofail("accept");
        // Fork a new connection.
        co_spawn(io_context, echo(uring, client_fd));
    }
}

int main() {
    // CREATE and LISTEN
    auto server_fd = make_server({.port=8848});
    auto server_fd_cleanup = defer([&](...) { close(server_fd); });

    io_uring uring;
    constexpr size_t ENTRIES = 256;
    io_uring_queue_init(ENTRIES, &uring, 0);
    auto uring_cleanup = defer([&](...) { io_uring_queue_exit(&uring); });

    Io_context io_context(uring);
    // Kick off!
    co_spawn(io_context, server(&uring, io_context, server_fd));
    io_context.run();
}

这是一个echo服务器,是不是有Asio内味了。

协程实现

coroutine

struct Task {
    struct promise_type;
    constexpr Task(std::coroutine_handle<promise_type> handle) noexcept: _handle(handle) {}
    ~Task() { if(_handle) _handle.destroy(); }
    auto detach() noexcept { return std::exchange(_handle, {}); }
    // Move ctor only.
    Task(Task &&rhs) noexcept: _handle(rhs.detach()) {}
    Task(const Task&) = delete;
    Task& operator=(const Task&) = delete;
    Task& operator=(Task&&) = delete;
    auto operator co_await() && noexcept;
private:
    std::coroutine_handle<promise_type> _handle;
};

struct Task::promise_type {
    constexpr auto initial_suspend() const noexcept { return std::suspend_always{}; }
    constexpr void return_void() const noexcept { /*exception_ptr...*/ }
    void unhandled_exception() { /*exception_ptr...*/ }
    Task get_return_object() noexcept {
        auto h = std::coroutine_handle<promise_type>::from_promise(*this);
        return {h};
    }
    struct Final_suspend {
        constexpr bool await_ready() const noexcept { return false; }
        auto await_suspend(auto callee) const noexcept {
            auto caller = callee.promise()._caller;
            // Started task (at least once) will kill itself in final_suspend.
            callee.destroy();
            return caller;
        }
        // Never reached.
        constexpr auto await_resume() const noexcept {}
    };
    constexpr auto final_suspend() const noexcept { return Final_suspend{}; }
    void push(std::coroutine_handle<> caller) noexcept { _caller = caller; }

    std::coroutine_handle<> _caller {std::noop_coroutine()};
};

// Multi-task support (rvalue only).
// Examples:
//   GOOD:
//     co_await make_task(...);
//     ////////////////////////////
//     Task task = make_task(...);
//     co_await std::move(task);
//   BAD:
//     Task task = make_task(...); // Compilable but meaningless.
//     co_await task;              // Error. Rejected by compiler.
inline auto Task::operator co_await() && noexcept {
    struct awaiter {
        bool await_ready() const noexcept { return !_handle || _handle.done(); }
        auto await_suspend(std::coroutine_handle<> caller) noexcept {
            _handle.promise().push(caller);
            // Multi-tasks are considered as a single operation in io_contexts.
            return _handle;
        }
        constexpr auto await_resume() const noexcept {}

        std::coroutine_handle<Task::promise_type> _handle;
    };
    return awaiter{detach()};
}

io_context

// A quite simple io_context.
class Io_context {
public:
    explicit Io_context(io_uring &uring): uring(uring) {}
    Io_context(const Io_context &) = delete;
    Io_context& operator=(const Io_context &) = delete;

    void run() { for(_stop = false; running(); run_once()); }

    // Once = submit + reap.
    template <bool Exactly_once = false>
    void run_once() {
        auto some = Exactly_once ? take_once() : take_batch();
        namespace views = std::ranges::views;
        for(auto _ : views::iota(0) | views::take(some)) {
            auto op = _operations.front();
            _operations.pop();
            op.resume();
            // Unused.
            [](...){}(_);
        }

        if((_inflight += io_uring_submit(&uring)) == 0) {
            hang();
            return;
        }

        // Some cqes are in-flight,
        // even if we currently have no any h.resume().
        // Just continue along the path!

        io_uring_cqe *cqe;
        unsigned head;
        unsigned done = 0;
        // Reap one operation / multiple operations.
        // NOTE: One operation can also generate multiple cqes (awaiters).
        io_uring_for_each_cqe(&uring, head, cqe) {
            done++;
            // For io_uring_prep_cancel().
            if(cqe->res == -ECANCELED) [[unlikely]] continue;
            auto user_data = std::bit_cast<Async_user_data*>(cqe->user_data);
            user_data->cqe = cqe;
            user_data->h.resume();
        }
        done ? io_uring_cq_advance(&uring, done) : hang();

        assert(_inflight >= done);
        _inflight -= done;
    }

    // Some observable IO statistics.
    // These APIs are not affected by stop flag.
    auto pending() const { return _operations.size(); }
    auto inflight() const noexcept { return _inflight; }
    bool drained() const { return !pending() && !inflight(); }

    // Only affect the run() interface.
    // The stop flag will be reset upon re-run().
    //
    // Some in-flight operations will be suspended when calling stop().
    // This provides the opportunity to do fire-and-forget tasks.
    //
    // So it is the responsibility of users to ensure the correctness of this function.
    // What users can do if they want to complete all tasks:
    // 1. blocking method: re-run() agagin.
    // 2. non-blocking method: while(!drained()) run_once();
    void stop() noexcept { _stop = true; }
    bool stopped() const { return _stop && !pending(); }
    bool running() const { return !stopped(); }

    friend void co_spawn(Io_context &io_context, Task &&task) {
        io_context._operations.emplace(task.detach());
    }

private:
    void hang() {
        // TODO: config option, eventfd.
        constexpr bool ENABLE_BUSY_LOOP = false;
        if constexpr (!ENABLE_BUSY_LOOP) {
            // FIXME: yield() in a single thread makes no sense.
            using namespace std::chrono_literals;
            std::this_thread::sleep_for(1ns);
        }
    }

    size_t take_batch() const {
        constexpr size_t /*same type*/ BATCH_MAX = 32;
        return std::min(BATCH_MAX, _operations.size());
    }

    size_t take_once() const {
        return !_operations.empty();
    }

    io_uring &uring;
    std::queue<std::coroutine_handle<>> _operations;
    size_t _inflight {};
    bool _stop {false};
};

async_operation

struct Async_user_data {
    io_uring *uring;
    io_uring_sqe *sqe {};
    io_uring_cqe *cqe {};
    // io_contexts may submit before setting up `h` in await_suspend().
    // Therefore:
    // 1. Operations need a check in await_ready().
    // 2. `h` should be initially `std::noop-`, which is safe (and no effect) to resume.
    std::coroutine_handle<> h {std::noop_coroutine()};

    Async_user_data(io_uring *uring) noexcept: uring(uring) {}
};

struct Async_operation {
    constexpr bool await_ready() const noexcept {
        // No allocation error and no eager completion.
        if(user_data.sqe && !user_data.cqe) [[likely]] {
            return false;
        }
        return true;
    }
    void await_suspend(std::coroutine_handle<> h) noexcept {
        user_data.h = h;
    }
    auto await_resume() const noexcept {
        if(!user_data.sqe) [[unlikely]] {
            return -ENOMEM;
        }
        return user_data.cqe->res;
    }
    Async_operation(io_uring *uring, auto uring_prep_fn, auto &&...args) noexcept: user_data(uring) {
        // If !sqe, return -ENOMEM immediately. (await_ready() => true.)
        if((user_data.sqe = io_uring_get_sqe(uring))) [[likely]] {
            uring_prep_fn(user_data.sqe, std::forward<decltype(args)>(args)...);
            // https://man7.org/linux/man-pages/man3/io_uring_cqe_get_data.3.html
            // For Linux v5.15, data must be set AFTER prep_fn();
            // otherwise, io_uring will return an inaccessible CQE.
            // This problem does not exist in Linux v6.1.
            // However, according to the man page,
            // set_data() only needs to be called before submit().
            // Fine, it just works...
            io_uring_sqe_set_data(user_data.sqe, &user_data);
        }
    }

    Async_user_data user_data;
};

inline auto async_operation(io_uring *uring, auto uring_prep_fn, auto &&...args) noexcept {
    return Async_operation(uring, uring_prep_fn, std::forward<decltype(args)>(args)...);
}

inline auto async_accept(io_uring *uring, int server_fd,
        sockaddr *addr, socklen_t *addrlen, int flags = 0) noexcept {
    return async_operation(uring,
        io_uring_prep_accept, server_fd, addr, addrlen, flags);
}

inline auto async_accept(io_uring *uring, int server_fd, int flags = 0) noexcept {
    return async_operation(uring,
        io_uring_prep_accept, server_fd, nullptr, nullptr, flags);
}

// On  files  that  support seeking, if the `offset` is set to -1, the read operation commences
// at the file offset, and the file offset is incremented by the number of bytes read. See read(2)
// for more details. Note that for an async API, reading and updating the current file offset may
// result in unpredictable behavior, unless access to the file is serialized.
// It is **not encouraged** to use this feature, if it's possible to provide the  desired  IO offset
// from the application or library.
inline auto async_read(io_uring *uring, int fd, void *buf, size_t n, uint64_t offset = 0) noexcept {
    return async_operation(uring,
        io_uring_prep_read, fd, buf, n, offset);
}

inline auto async_write(io_uring *uring, int fd, const void *buf, size_t n, uint64_t offset = 0) {
    return async_operation(uring,
        io_uring_prep_write, fd, buf, n, offset);
}

inline auto async_close(io_uring *uring, int fd) noexcept {
    return async_operation(uring,
        io_uring_prep_close, fd);
}

去除注释后为164行。

辅助实现

utils主要是方便错误处理和提供免除类设计的RAII。可以不关心这些。

// C++-style check for syscall.
// Failed on ret < 0 by default.
//
// INT_ASYNC_CHECK: helper for liburing (-ERRNO) and other syscalls (-1).
// It may break generic programming (forced to int).
template <typename Comp = std::less<int>, auto V = 0, bool INT_ASYNC_CHECK = true>
struct nofail {
    std::string_view reason;

    // Examples:
    // fstat(...) | nofail("fstat");        // Forget the if-statement and ret!
    // int fd = open(...) | nofail("open"); // If actually need a ret, here you are!
    friend decltype(auto) operator|(auto &&ret, nofail nf) {
        if(Comp{}(ret, V)) [[unlikely]] {
            // Hack errno.
            if constexpr (INT_ASYNC_CHECK) {
                using T = std::decay_t<decltype(ret)>;
                static_assert(std::is_convertible_v<T, int>);
                // -ERRNO
                if(ret != -1) errno = -ret;
            }
            perror(nf.reason.data());
            std::terminate();
        }
        return std::forward<decltype(ret)>(ret);
    };
};

// Make clang happy.
nofail(...) -> nofail<std::less<int>, 0, true>;

// Go-style, move-safe defer.
[[nodiscard("defer() is not allowed to be temporary.")]]
inline auto defer(auto func) {
    // Make STL happy.
    auto dummy = reinterpret_cast<void*>(0x1);
    return std::unique_ptr<void, decltype(func)>{dummy, std::move(func)};
}

// For make_server().
struct make_server_option_t {
    int port {8848};
    int backlog {128};
    bool nonblock {false};
    bool reuseaddr {true};
    bool reuseport {false};
};

// Do some boring stuff and return a server fd.
inline int make_server(make_server_option_t option) {
    int socket_flag = option.nonblock ? SOCK_NONBLOCK : 0;
    int socket_fd = socket(AF_INET, SOCK_STREAM, socket_flag) | nofail("socket");

    auto setsock = [enable = 1, fd = socket_fd](int optname) {
        setsockopt(fd, SOL_SOCKET, optname, &enable, sizeof(int)) | nofail("setsockopt");
    };
    if(option.reuseaddr) setsock(SO_REUSEADDR);
    if(option.reuseport) setsock(SO_REUSEPORT);

    sockaddr_in addr {};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(option.port);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    // About the strict aliasing rule:
    // `reinterpret_cast` is OK if we don't derenference it (with the wrong type).
    // But what about the implementation of `bind()`?
    //   - `bind()` must dereference the object with a wrong type.
    // Can we memcpy an actual `sockaddr` from `sockaddr_xxx` then pass it to `bind()`?
    //   - Normally, memcpy is a good way to make type punning safe.
    //   - It is guaranteed by the C (and C++) standard to access through an object.
    //   - But answer is NO, because `sizeof(sockaddr_xxx) <= sizeof(sockaddr)` is not necessarily true.
    //   - How can you access an object without proper size and alignment?
    //   - `launder()` with cast / `start_lifetime_as()` cannot solve the `sizeof()` problem either.
    // Does it violate the strict aliasing rule?
    //   - Maybe. It depends on the library side. We cant do any more.
    //   - But many people explicitly cast types with UNIX interfaces.
    //   - And compilers should not offend users, especially for legacy codes.
    //   - So practically, it is OK.
    auto no_alias_addr = reinterpret_cast<const sockaddr*>(&addr);

    bind(socket_fd, no_alias_addr, sizeof(addr)) | nofail("bind");

    listen(socket_fd, option.backlog) | nofail("listen");

    return socket_fd;
}

实现要点

liburing io_uring作者Jens Axboe的演讲

  1. co_await既支持awaiter,也支持协程本身,类似于asio::awaitable,虽说不如Asio精妙。
  2. 被co_spawn或内部co_await的协程会放弃协程描述符的所有权,在最后的暂停点再自行销毁。
  3. 左值的awaiter不值得使用,会增加很多不必要的异步处理,但是轮子中并没有完全禁用。
    • 比如先构造了一个awaiter,发起了异步请求。
    • 然后co_await另一个awaiter。
    • 原awaiter可能无法在await_suspend中liburing submit前插入新的flag。
    • 有可能是因完成而被收割,更有可能错过了resume设置,需要await_ready弥补判断。
    • 而对于右值awaiter,这些操作是整体原子的,没有这种麻烦事和额外开销。
    • 左值当然可以延迟请求,但需要存储转发参数并强制模板类设计,因此没有采用。
  4. co_spawn投递协程,run在当前上下文执行协程,但是io_uring在多种特性下是无法得知正确的已提交异步操作数,比如:
    • multishot可以单个sqe提交并生成多个cqe,cqe自然无法在实际完成前得知。
    • SQPOLL可以交由内核线程去处理sqe提交,因此用户侧提交数本身就是不正确的,这点在man手册和社区都有提到。
  5. 协程的定制点使得特性添加变得容易,比如multishotIO drain使用基本的继承就能轻松完成。
  6. 人生苦短,千万别直接硬上io_uring,多用点liburing。 (Use it! Don’t be a hero.)

特性使用

这部分算是番外篇。io_uring的特性非常丰富,可玩性相当的高(可惜的是这方面man手册写得真不怎样)。因此这些适配也都尽可能写了,在源码仓库中我尝试一个特性对应一个示例。当然了,用上这些特性就远不只两百行了。

简单的再给出一点使用示例吧。

// Multishot, 只对内核启动一个异步操作,但是允许多次完成
// https://manpages.debian.org/unstable/liburing-dev/io_uring_prep_multishot_accept.3.en.html
Task server(io_uring *uring, Io_context &io_context, int server_fd) {
    // A multishot (submit once) awaiter.
    auto awaiter = async_multishot_accept(uring, server_fd);
    for(;;) {
        // No need to submit more sqes.
        auto client_fd = co_await awaiter | nofail("multishot_accept");
        co_spawn(io_context, echo(uring, client_fd));
    }
}
// IO link,使用&或者|来链式同步操作,无需多次co_await
// https://manpages.debian.org/unstable/liburing-dev/io_uring_sqe_set_flags.3.en.html#IOSQE_IO_LINK
Task sayhello(io_uring *uring, int client_fd) {
    using namespace std::literals;
    auto hello = "hello "sv;
    auto world = "world!\n"sv;
    // Actually co_await only for the last one. (by proxy)
    // But still keep that orders.
    co_await (async_write(uring, client_fd, hello.data(), hello.size())
            & async_write(uring, client_fd, world.data(), world.size())
            & async_close(uring, client_fd));
}
// IO drain,完成通知前需先完成此前启动的异步操作
// https://manpages.debian.org/unstable/liburing-dev/io_uring_sqe_set_flags.3.en.html#IOSQE_IO_DRAIN
Task server(io_uring *uring, Io_context &io_context, int server_fd) {
    for(;;) {
        auto client_fd = co_await async_drain_accept(uring, server_fd) | nofail("accept");
        co_spawn(io_context, echo(uring, client_fd));
    }
}
// provided buffers,无需在异步操作前预备buffer
// https://manpages.debian.org/unstable/liburing-dev/io_uring_setup_buf_ring.3.en.html
Task echo(io_uring *uring, int client_fd, auto provided_buffers_token, auto buffer_helpers) {
    const auto &[buffer_finder, buffer_size, _] = buffer_helpers;
    for(;;) {
        // We don’t need to prepare a buffer before completing the operation.
        auto [n, bid] = co_await async_read(provided_buffers_token, client_fd, nullptr, buffer_size);

        auto rejoin = defer([&](...) {
            buffer_rejoin(provided_buffers_token, buffer_helpers, bid);
        });

        const auto buf = buffer_finder(bid);
        co_await async_write(uring, client_fd, buf, n) | nofail("write");
        // ...
    }
}
// (不算特性,只是协程的封装)通过switch_to()来回切换不同的io_context所在线程
Task just_print(io_uring (&uring)[2], Io_context (&io_context)[2]) {
    for(size_t i = 0; ; i ^= 1) {
        std::ostringstream oss;
        oss << "current thread is "
            << std::this_thread::get_id()
            << std::endl;
        auto string = oss.str();
        co_await async_write(&uring[i], 1, string.data(), string.size());

        // Switch to the thread where io_context[i^1] is running.
        co_await switch_to(io_context[i ^ 1]);
    }
}

Repo

源码存放在Github/Caturra000/io_uring-examples-cpp,示例代码的目的仅用于分析io_uring的行为,并不考虑低内核版本的兼容性,更新随缘。

This post is licensed under CC BY 4.0 by the author.
Contents