Home 从 C++20 协程,到 Asio 的协程适配
Post
Cancel

从 C++20 协程,到 Asio 的协程适配

前言

上文已经解析了 Asio 早期的无栈协程实现,本文继续探讨 Asio 对于 C++20 协程的适配。

另外,我在知乎翻译了一篇文章,是关于 Asio 的接口演进历史以及现代网络编程,推荐阅读本文 Asio 协程章节前先热身:为什么 C++20 是最 awesome 的网络编程语言

C++20 协程

翻源码也是一种了解 C++20 协程的实践途径,在开始前先啰嗦几句 C++20 协程标准的设计。

不想听老 ass 念经?点击跳过!

基本介绍

C++20 协程并不是完全的编译器实现,也不是完全的库实现。从类设计上分类,可以分为三大块:

  • 协程描述符(coroutine handle),用于外部接口的交互。
  • 承诺类型(promise type),用于内部的库实现设计。
  • 协程帧(coroutine frame / coroutine state),由编译器处理的上下文信息。

当然除了类以外,还有函数本身。在 C++ 标准中,协程是函数:A coroutine is a function that can suspend execution to be resumed later.

coroutine 不知用了多少遍的好图

先说协程描述符 std::coroutine_handle<>,它作为一个描述符的作用就是调用接口。比如 resume()destroy(),一个已暂停的协程接下来能做的事情只有恢复或销毁;还有观测用接口 done() 和访问关联的承诺类型 promise() 等等。总之是个容易理解的接口类型。

承诺类型 promise_type 可以定制协程的具体行为以及处理值的传递。它可以通过 initial_suspend()final_suspend() 定制是否要在特殊的暂停点(suspension point)执行暂停操作;也可以通过 get_return_object() 得到首次暂停时得到的对象;以及 yield_value(v) 可接收 co_yield v产出(yield,表示暂停并返回一个值)的值 v

至于协程帧则是由编译器控制生成的部分。我们在聊协程时之所以完全不关心所谓上下文保存,那是因为这部分交给编译器管理了,一般来说会隐式的调用动态内存分配 operator new 来存储上下文信息,但是也会尽可能优化而无需实际分配。

执行流程

实现一个协程 hello world 程序还需要了解承诺类型的 get_return_object(),协程描述符的静态函数 from_promise(),以及协程函数的的返回值之间的关联。先看用例吧:

// 一个懒惰的生成器,生成 [0, n) 序列,每调用一次生成一个值
static My_generator<int> coroutine_function(size_t n) {
    // 尽管这里看着像是无限生成序列……
    for(auto i : std::views::iota(0)) {
        co_yield i;
    }
}

int main() {
    constexpr size_t n = 7;
    auto generate = coroutine_function(n);
    // 这里使用定制的 operator bool 和 operator ()
    while(generate) {
        std::cout << generate() << ' ';
    }
    return 0;
}

// 输出:
0 1 2 3 4 5 6

假设实现一个 My_generator,这里的协程该怎么工作?可以简单认为:

  1. 编译器使用 operator new 分配协程帧对象。
  2. 如果有函数参数,则拷贝/移动到协程帧中,引用不需处理。
  3. 调用承诺对象 promise 的构造函数,其参数使用协程中的传参(在这里为 n)。
  4. 调用 promise.get_return_object(),后续作为返回对象给上层(在这里就是 generate 对象)。
  5. 协程是否暂停取决于 promise.initial_suspend(),一般如果返回类型是 std::suspend_always,立刻回到上层调用者。
  6. 后续调用再执行函数体。
  7. 每一次调用 generate()(内部实际为 coroutine_handle.resume() 封装),已暂停的 coroutine_function() 会恢复现场,后续在 co_yield 处暂停并返回 i
  8. 调用 promise.yield_value(...),传参即为上一步返回(产出)的值。
  9. 由用户在 yield_value() 函数体内实现 promise 内部存放临时存储,回到上层 generate(),由用户获取存放在 promise 的值。
  10. generate() 返回值给上层(即 main())。

NOTES:

  • 协程的返回类型 your_return_type 务必有 typename your_return_type::promise_type,这也是第 3 步时编译器得知对应 promise_type 的原因(或者使用 std::coroutine_traits)。
  • 第 4 步如果需要从中获取协程描述符,那就在承诺对象的成员函数内使用 coruntine_handle<decltype(promise)>::from_promise(*this)
  • 当遇到暂停点时,先前获得的对象将返回,因此第 4 步 get_return_object() 得到的对象返回。
  • 第 5 步其实是 co_await promise.initial_suspend(),具体后面再说。

一个完整的示例如下:

#include <coroutine>
#include <iostream>
#include <ranges>

template <typename T>
class My_generator {
public:
    struct promise_type;

    // ctor / dtor
    explicit constexpr
      My_generator(std::coroutine_handle<promise_type> h) noexcept
        : _handle(h) {}
    ~My_generator() { _handle ? _handle.destroy() : void(0); }

    // copy / move
    My_generator(const My_generator &) = delete;
    My_generator(My_generator &&rhs) = delete;
    My_generator& operator=(const My_generator &rhs) = delete;

    // user APIs
    T operator ()() {
        if(!resumable()) {
            throw std::runtime_error("AIEEEEE! A NINJA!?");
        }
        if(!operator bool()) {
            throw std::logic_error("WHY THERE'S A NINJA HERE!?");
        }
        _handle.resume();
        _handle.promise().count--;
        return std::move(_handle.promise().result);
    }

    explicit operator bool() const noexcept {
        return _handle.promise().count > 0;
    }

    bool resumable() const noexcept {
        return _handle && !_handle.done();
    }

private:
    std::coroutine_handle<promise_type> _handle;
};

template <typename T>
struct My_generator<T>::promise_type {
    explicit constexpr
      promise_type(size_t count) noexcept(noexcept(T())): count(count) {}
    auto initial_suspend() noexcept { return std::suspend_always{}; }
    auto final_suspend() noexcept { return std::suspend_never{}; }
    void unhandled_exception() {}

    My_generator<T> get_return_object();
    auto yield_value(auto &&value);

    size_t count;
    T result;
};

template <typename T>
My_generator<T> My_generator<T>::promise_type::get_return_object() {
    auto handle = std::coroutine_handle<promise_type>::from_promise(*this);
    return My_generator{handle};
}

template <typename T>
auto My_generator<T>::promise_type::yield_value(auto &&value) {
    result = std::forward<decltype(value)>(value);
    return std::suspend_always{};
}

// C++ coroutines are special functions
static My_generator<int> coroutine_function(size_t n) {
    for(auto i : std::views::iota(0)) {
        co_yield i;
    }
}

int main() {
    constexpr size_t n = 7;
    auto generate = coroutine_function(n);
    while(generate) {
        std::cout << generate() << ' ';
    }
    return 0;
}

Awaiter

实际上,co_yield expr等价于co_await promise.yield_value(expr)co_await 是一个暂停点,也就是说等待并返回控制权给上层的操作由它来完成。

我们定义两种不同的概念用于 co_await 的定制:

  • 可等待体(awaitable):允许被 co_await 接收的表达式。
  • 等待器(awaiter):实现了 await_ready() / await_suspend() / await_resume() 的对象。

当可等待体 awaitable 被调用 co_await 时,编译器可以对可等待体执行转换:

  • 如果此时暂停点为 initial_suspend() 或者 final_suspend(),则没有变动。
  • 如果此时暂停点实际使用 co_yield,则没有变动。
  • 如果承诺对象 promise 存在函数 await_transform(awaitable),那可等待体可以转换为 await_transform(awaitable)

当确认可等待体后,还需要继续通过重载以确认等待器:

  • 通过成员函数 awaitable.operator co_await() 确认。
  • 通过非成员函数 operator co_await(awaitable) 确认。
  • 如果没有重载,那么可等待体即是等待器。

这些复杂的概念哪里会用到?我们一直在用。前面例子中的 co_yield promise.yield_value(expr) 实际展开后,(除了 yield_value() 的函数执行以外)就是 co_await std::suspend_always{}

  // libstdc++ 实现
  struct suspend_always
  {
    constexpr bool await_ready() const noexcept { return false; }

    constexpr void await_suspend(coroutine_handle<>) const noexcept {}

    constexpr void await_resume() const noexcept {}
  };

  struct suspend_never
  {
    constexpr bool await_ready() const noexcept { return true; }

    constexpr void await_suspend(coroutine_handle<>) const noexcept {}

    constexpr void await_resume() const noexcept {}
  };

由于没有上述的转换和重载,可以得知 std::suspend_always 既是可等待体,也是等待器。

等待器的基本实现思路如下:

  • 假设协程于 <suspend-point> 表示实际的暂停点,于 <resume-point> 表示实际的恢复点。
  • bool await_ready() 表示是否到达 <suspend-point>
    • 返回 false 进入 <suspend-point>
    • 返回 true 进入 <resume-point>
  • auto await_suspend(coroutine_handle) 表示已到达 <suspend-point> 后的执行函数:
    • 如果返回 voidtrue,表示保持暂停而不进入 <resume-point>
    • 如果返回 false,将进入当前协程的 <resume-point>
    • 如果返回某个协程描述符,将进入对应协程的 <resume-point>
  • auto await_resume() 表示已到达 <resume-point> 后的执行函数:
    • 其返回值就是 co_await 最终得到的返回结果。

不难看出,等待器的三个接口是基于事件处理来设计的:

  • await_ready() 表示事件是否需要已完成,完成则直接收集,否则需要处理再收集。
  • await_suspend() 表示处理事件的方式,可采用同步或者异步。
  • await_resume() 表示收集事件的结果。

NOTES:

  • <suspend-point> <resume-point> 是由编译器生成的。
  • 不进入 <resume-point> 意味着控制权返回给上层
  • await_suspend(coroutine_handle) 如果返回任意协程描述符 h,等同于调用 h.resume()
  • await_suspend(coroutine_handle) 可以返回自身关联的 coroutine_handle
  • 到达 <suspend-point> 后可以 .resume(),因此 await_suspend() 返回前甚至可以 .resume() 自身。
  • 关于上一条 NOTE 需要注意协程帧与 *this 的生命周期差异,这份代码会教你做事。

Lewis Baker 的文章给了一份很好的伪代码作为解释:

// So, assuming we have encapsulated the logic for turning the <expr> result
// into an Awaiter object into the above functions then the semantics of
// co_await <expr> can be translated (roughly) as follows:
{
  auto&& value = <expr>;
  auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
  auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>(awaitable));
  if (!awaiter.await_ready())
  {
    using handle_t = std::experimental::coroutine_handle<P>;

    using await_suspend_result_t =
      decltype(awaiter.await_suspend(handle_t::from_promise(p)));

    <suspend-coroutine>

    if constexpr (std::is_void_v<await_suspend_result_t>)
    {
      awaiter.await_suspend(handle_t::from_promise(p));
      <return-to-caller-or-resumer>
    }
    else
    {
      // 这里有些过时了,handle 也是可以的,总之问题不大
      static_assert(
         std::is_same_v<await_suspend_result_t, bool>,
         "await_suspend() must return 'void' or 'bool'.");

      if (awaiter.await_suspend(handle_t::from_promise(p)))
      {
        <return-to-caller-or-resumer>
      }
    }

    <resume-point>
  }

  return awaiter.await_resume();
}

你也可以动手改造文中给的示例,来理解等待器的行为:

template <typename T>
 auto My_generator<T>::promise_type::yield_value(auto &&value) {
     result = std::forward<decltype(value)>(value);
-    return std::suspend_always{};
+    //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+    struct Awaiter {
+        bool await_ready() {
+            std::cout << "ready?" << std::endl;
+            return false;
+        }
+        auto await_suspend(std::coroutine_handle<> handle) {
+            std::cout << "suspend!" << std::endl;
+            std::cout << handle.address() << std::endl;
+            return void();
+            // return false;
+            // return true;
+            // return /*next_handle*/;
+        }
+        void await_resume() {
+            std::cout << "resume!" << std::endl;
+        }
+    };
+    return Awaiter{};
+    //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 }

返回语句

Q: 我已经明白 co_yieldco_await 了,那还有 co_return 呢?

A: 不想写了,自己品鉴吧。

When a coroutine reaches the co_return statement, it performs the following:

  • calls promise.return_void() for
    • co_return;
    • co_return expr; where expr has type void
  • or calls promise.return_value(expr) for co_return expr; where expr has non-void type
  • destroys all variables with automatic storage duration in reverse order they were created.
  • calls promise.final_suspend() and co_awaits the result.

When the coroutine state is destroyed either because it terminated via co_return or uncaught exception, or because it was destroyed via its handle, it does the following:

  • calls the destructor of the promise object.
  • calls the destructors of the function parameter copies.
  • calls operator delete to free the memory used by the coroutine state.
  • transfers execution back to the caller/resumer.

Asio 协程

众所周知,Asio 出于扩展性和兼容性的需求,代码是比较抽象的。本章按照一个实际调用的过程来探讨对于异步函数的 C++20 协程适配,非协程相关的内容会被忽略,这样可以减少阅读上的复杂度。

实际上是按照深度优先的顺序去阅读的,而不是先给设计然后解释的顺序。又因为异步的关系,往往需要边看边往上翻。

异步函数

async_read_some() 为例,Asio 可按如下方式使用协程:

asio::awaitable<void> do_stuff(asio::ip::tcp::socket socket) {
    char buf[1024];
    for(;;) {
        size_t n = co_await socket.async_read_some(asio::buffer(buf), asio::use_awaitable);
        // ...
    }
}

async operation

现在跟踪 async_read_some() 的函数签名:

  template <typename MutableBufferSequence,
      ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code,
        std::size_t)) ReadHandler
          ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler,
      void (asio::error_code, std::size_t))
  async_read_some(const MutableBufferSequence& buffers,
      ASIO_MOVE_ARG(ReadHandler) handler
        ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
  {
    return async_initiate<ReadHandler,
      void (asio::error_code, std::size_t)>(
        initiate_async_receive(this), handler,
        buffers, socket_base::message_flags(0));
  }

async_read_some() 内部实现分为两步:

  • 先是构造并传入了 initiate_async_receive(this) 对象。
  • 然后调用 async_initiate() 函数。

(这里 handlerasio::use_awaitable

先跟踪 initiate_async_receive 类型:

  class initiate_async_receive
  {
  public:
    typedef Executor executor_type;

    explicit initiate_async_receive(basic_stream_socket* self)
      : self_(self)
    {
    }

    const executor_type& get_executor() const noexcept
    {
      return self_->get_executor();
    }

    template <typename ReadHandler, typename MutableBufferSequence>
    void operator()(ReadHandler&& handler,
        const MutableBufferSequence& buffers,
        socket_base::message_flags flags) const
    {
      // If you get an error on the following line it means that your handler
      // does not meet the documented type requirements for a ReadHandler.
      ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;

      detail::non_const_lvalue<ReadHandler> handler2(handler);
      self_->impl_.get_service().async_receive(
          self_->impl_.get_implementation(), buffers, flags,
          handler2.value, self_->impl_.get_executor());
    }

  private:
    basic_stream_socket* self_;
  };

主要操作集中在 operator(),而构造函数并没有特殊的副作用,因此先不关注。

async_initiate()

接下来再分析 async_initiate() 函数。该函数存在 2 个 SFINAE 版本,用以区分 async_result 有没有成员函数。这里用版本 1 和版本 2 表示:

// async_initiate() 版本 1
// 调用 async_result<>::initiate()
// SFINAE 依据 async_result_has_initiate_memfn 选择
// <del>写的什么玩意,崩的要死</del>
template <typename CompletionToken,
    ASIO_COMPLETION_SIGNATURE Signature,
    typename Initiation, typename... Args>
inline typename enable_if<
    detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
    ASIO_INITFN_DEDUCED_RESULT_TYPE(CompletionToken, Signature,
      (async_result<typename decay<CompletionToken>::type,
        Signature>::initiate(declval<ASIO_MOVE_ARG(Initiation)>(),
          declval<ASIO_MOVE_ARG(CompletionToken)>(),
          declval<ASIO_MOVE_ARG(Args)>()...)))>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
    ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
    ASIO_MOVE_ARG(Args)... args)
{
  return async_result<typename decay<CompletionToken>::type,
    Signature>::initiate(ASIO_MOVE_CAST(Initiation)(initiation),
      ASIO_MOVE_CAST(CompletionToken)(token),
      ASIO_MOVE_CAST(Args)(args)...);
}

版本 1 就是挑选 async_result::initate(initiation, token, args...)

// async_initiate() 版本 2
// 调用 initiation(), async_completion<>.result.get();
template <typename CompletionToken,
    ASIO_COMPLETION_SIGNATURE Signature,
    typename Initiation, typename... Args>
inline typename enable_if<
    !detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
    ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
    ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
    ASIO_MOVE_ARG(Args)... args)
{
  async_completion<CompletionToken, Signature> completion(token);

  ASIO_MOVE_CAST(Initiation)(initiation)(
      ASIO_MOVE_CAST(ASIO_HANDLER_TYPE(CompletionToken,
        Signature))(completion.completion_handler),
      ASIO_MOVE_CAST(Args)(args)...);

  return completion.result.get();
}

版本 2 需要引入 async_completion 类,主要是 async_completion 配合 token 从中推导出 completion_handler

调用过程为:

  • 通过 token 构造 async_completion,该对象提供 handler
  • 执行 initiation 对象的 operator()(handler, args...)
  • 收集 result

至于版本是怎么区分的就看这一堆萃取:

template <typename CompletionToken,
    ASIO_COMPLETION_SIGNATURE... Signatures>
struct async_result_has_initiate_memfn
  // 根据 sizeof(async_result_initiate_memfn_helper) 大小来判断
  // > 1 就有成员函数, == 1 没有
  : integral_constant<bool, sizeof(async_result_initiate_memfn_helper<
      async_result<decay_t<CompletionToken>, Signatures...>
    >(0)) != 1>
{
};

// 而 async_result_initiate_memfn_helper 有 2 个版本

// 不具有实现的函数签名,对应大小为 1
template <typename>
char (&async_result_initiate_memfn_helper(...))[2];

// 如果有实现,那就是 16 字节
// T 为 async_result<>
template <typename T>
char async_result_initiate_memfn_helper(
    async_result_memfns_check<
      void (async_result_memfns_base::*)(),
      &async_result_memfns_derived<T>::initiate>*);


// T 为 void (async_result_memfns_base::*)()
// 传入&async_result_memfns_derived<T>::initiate>*
template <typename T, T>
struct async_result_memfns_check
{
};

// 对类型{T, async_result_memfns_base}构成 mixin
// 此时具有了 initiate() 成员函数
template <typename T>
struct async_result_memfns_derived
  : T, async_result_memfns_base
{
};

// 因此就是看是否有 async_result_memfns_derived<async_result>::initiate() 成员函数
// 而这个成员函数是通过 async_result::initiate() 的继承得到的
// 显然在这个 async_read_some() 流程中对应的 async_result<>特化是具有 initiate() 成员函数的
// (为啥写这么绕呢?也许这就是 Asio 吧……)

async_result

async_result async_result 是一个定制点:决定返回类型和具体完成令牌

目前分析得到,流程会调用到 async_result<CompletionToken, Signature>::initiate()。而这里对应的完成令牌(completion token)即为 asio::use_awaitable_t,依此可以找到特化模板:

template <typename Executor, typename R, typename... Args>
class async_result<use_awaitable_t<Executor>, R(Args...)>
{
public:
  typedef typename detail::awaitable_handler<
      Executor, typename decay<Args>::type...> handler_type;
  typedef typename handler_type::awaitable_type return_type;

  // 调用到的 initiate() 函数
  // 注意到这是一个协程,因此要关注 return_type 是怎样的类设计
  //
  // 另外,initiation 就是原地构造的 initiate_async_receive(socket) 对象
  template <typename Initiation, typename... InitArgs>
  static return_type initiate(Initiation initiation,
      use_awaitable_t<Executor> u, InitArgs... args)
  {
    (void)u;

    co_await [&](auto* frame)
      {
        ASIO_HANDLER_LOCATION((u.file_name_, u.line_, u.function_name_));
        handler_type handler(frame->detach_thread());
        // 实际是在这里处理初始化函数
        std::move(initiation)(std::move(handler), std::move(args)...);
        return static_cast<handler_type*>(nullptr);
      };

    for (;;) {} // Never reached.
#if defined(_MSC_VER)
    co_return dummy_return<typename return_type::value_type>();
#endif // defined(_MSC_VER)
  }
};

async_result<use_awaitable_t>::initiate() 函数使用了 co_await。根据定义,这是一个协程。对于协程我们需要关注其返回类型 return_type 的推导。

  // return_type 即 handler_type::awaitable_type
  // 那么 handler_type 又是啥
  typedef typename handler_type::awaitable_type return_type;


  // handler_type 就是 asio::detail::awaitable_handler
  typedef typename detail::awaitable_handler<
      Executor, typename decay<Args>::type...> handler_type;

小结

现在先阶段性小结吧。当我们以协程的方式调用一个异步函数时:

asio::awaitable<void> do_stuff_pseudo(auto io_object) {
    auto resume_result = co_await io_object.async_operation(..., asio::use_awaitable);
}
  • 实际是以 asio::awaitable 作为返回类型的协程的前提下。
  • 执行 co_await asio::detail::awaitable_handler::awaitable_type{}
  • 而异步函数则有 initiation = initiate_do_sth{io_object_ptr} 对象在内部传递。

这里假设 initiate_do_sth 是某个类,该类实现了 operator(),通过构造函数得到 initaiton 对象。

需要注意的是,从 do_stuff_pseudo 函数开始,就已经是协程了。Asio 设置的 initial_suspend() 选择返回 suspend_always。因此本地(当前运行的)上下文的调用必然是直接暂停返回的。

作为小结,这里给出一个大概的内联展开调用关系:

// 本地上下文,直接暂停
do_stuff_pseudo()
  ...
  io_object.async_operation()
    return async_initiate(initiation = initiate_do_sth{io_object_ptr})
      // 内部存在 co_await,这也是一个协程
      return async_result::initiate(initiation, token, ...)
        co_await [](frame) { /* 本该在这里展开 initiation(...) */ }
        for(;;)

-------------------------------------------------------------------

// 其它上下文,醒来后发现自己跑到别的地方去了
// 熟悉 Asio 的同学也应该猜到是由 executor 给出的上下文
initiation(handler = deduced_from(async_result<token>), ...)

剩下的问题核心就在于 awaitable_type 别名:先剧透一下,毫无意外就是 asio::awaitable

asio::awaitable

上面分析到协程的返回类型实际是 asio::detail::awaitable_handler::awaitable_type

逐一解析,先从 awaitable_handler 看起吧。这里开始将涉及到若干协程相关数据结构:

  • awaitable_handler.
  • awaitable_handler_base.
  • awaitable_thread.
  • awaitable_frame.
  • awaitable_frame_base.
  • awaitable.

awaitable_handler

// awaitable_handler 派生于 awaitable_handler_base
// 核心就是多了 operator() 作为 handler
template <typename Executor, typename... Ts>
class awaitable_handler
  : public awaitable_handler_base<Executor, std::tuple<Ts...>>
{
public:
  using awaitable_handler_base<Executor,
    std::tuple<Ts...>>::awaitable_handler_base;

  template <typename... Args>
  void operator()(Args&&... args)
  {
    this->frame()->attach_thread(this);
    this->frame()->return_values(std::forward<Args>(args)...);
    this->frame()->pop_frame();
    this->pump();
  }
};

// awaitable_handler_base 派生于 awaitable_thread
// 这里看到 awaitable_type 就是 asio::awaitable
template <typename Executor, typename T>
class awaitable_handler_base
  : public awaitable_thread<Executor>
{
public:
  typedef void result_type;
  // 找到你了
  typedef awaitable<T, Executor> awaitable_type;

  // Construct from the entry point of a new thread of execution.
  awaitable_handler_base(awaitable<void, Executor> a, const Executor& ex)
    : awaitable_thread<Executor>(std::move(a), ex)
  {
  }

  // Transfer ownership from another awaitable_thread.
  explicit awaitable_handler_base(awaitable_thread<Executor>* h)
    : awaitable_thread<Executor>(std::move(*h))
  {
  }

protected:
  awaitable_frame<T, Executor>* frame() noexcept
  {
    return static_cast<awaitable_frame<T, Executor>*>(this->top_of_stack_);
  }
};

awaitable_thread

// awaitable_handler 的基类,实际管理 awaitable 和 awaitable_frame
template <typename Executor>
class awaitable_thread
{
public:
  typedef Executor executor_type;

  // Construct from the entry point of a new thread of execution.
  awaitable_thread(awaitable<void, Executor> p, const Executor& ex)
    : bottom_of_stack_(std::move(p)),
      top_of_stack_(bottom_of_stack_.frame_),
      executor_(ex)
  {
  }

  // Transfer ownership from another awaitable_thread.
  awaitable_thread(awaitable_thread&& other) noexcept
    : bottom_of_stack_(std::move(other.bottom_of_stack_)),
      top_of_stack_(std::exchange(other.top_of_stack_, nullptr)),
      executor_(std::move(other.executor_))
  {
  }

  // Clean up with a last ditch effort to ensure the thread is unwound within
  // the context of the executor.
  ~awaitable_thread()
  {
    if (bottom_of_stack_.valid())
    {
      // Coroutine "stack unwinding" must be performed through the executor.
      (post)(executor_,
          [a = std::move(bottom_of_stack_)]() mutable
          {
            awaitable<void, Executor>(std::move(a));
          });
    }
  }

  executor_type get_executor() const noexcept
  {
    return executor_;
  }

  // Launch a new thread of execution.
  void launch()
  {
    top_of_stack_->attach_thread(this);
    pump();
  }

protected:
  template <typename> friend class awaitable_frame_base;

  // Repeatedly resume the top stack frame until the stack is empty or until it
  // has been transferred to another resumable_thread object.
  void pump()
  {
    do top_of_stack_->resume(); while (top_of_stack_);
    if (bottom_of_stack_.valid())
    {
      awaitable<void, Executor> a(std::move(bottom_of_stack_));
      a.frame_->rethrow_exception();
    }
  }

  awaitable<void, Executor> bottom_of_stack_;
  awaitable_frame_base<Executor>* top_of_stack_;
  executor_type executor_;
};

awaitable

现在定位到了 awaitable_handler_base::awaitable_type 别名就是 awaitable。看下实现:

/// The return type of a coroutine or asynchronous operation.
template <typename T, typename Executor = any_io_executor>
class awaitable
{
public:
  /// The type of the awaited value.
  typedef T value_type;

  /// The executor type that will be used for the coroutine.
  typedef Executor executor_type;

  /// Default constructor.
  constexpr awaitable() noexcept
    : frame_(nullptr)
  {
  }

  /// Move constructor.
  awaitable(awaitable&& other) noexcept
    : frame_(std::exchange(other.frame_, nullptr))
  {
  }

  /// Destructor
  ~awaitable()
  {
    if (frame_)
      frame_->destroy();
  }

  /// Checks if the awaitable refers to a future result.
  bool valid() const noexcept
  {
    return !!frame_;
  }

#if !defined(GENERATING_DOCUMENTATION)

  // Support for co_await keyword.
  bool await_ready() const noexcept
  {
    return false;
  }

  // Support for co_await keyword.
  // 不管是暂停还是恢复,都关联到 awaitable_frame
  template <class U>
  void await_suspend(
      detail::coroutine_handle<detail::awaitable_frame<U, Executor>> h)
  {
    frame_->push_frame(&h.promise());
  }

  // Support for co_await keyword.
  T await_resume()
  {
    return awaitable(static_cast<awaitable&&>(*this)).frame_->get();
  }

#endif // !defined(GENERATING_DOCUMENTATION)

private:
  template <typename> friend class detail::awaitable_thread;
  template <typename, typename> friend class detail::awaitable_frame;

  // Not copy constructible or copy assignable.
  awaitable(const awaitable&) = delete;
  awaitable& operator=(const awaitable&) = delete;

  // Construct the awaitable from a coroutine's frame object.
  // 构造一个 awaitable 必须要传递 awaitable_frame
  explicit awaitable(detail::awaitable_frame<T, Executor>* a)
    : frame_(a)
  {
  }

  detail::awaitable_frame<T, Executor>* frame_;
};

可以看出,awaitable 也实现了 awaiter 接口:

  • await_ready().
  • await_suspend().
  • await_resume().

它的实现没有复杂的设计:

  • 构造可能传入等待帧 awaitable_frame
  • 在暂停流程中,需要压帧操作。
  • 而在恢复流程中,则从帧获取结果。

awaitable_frame

显然协程的具体实现放到等待帧上,还没有梳理上面遗留的数据结构,现在又多了两个问题:这个帧长什么样子的呢?又是从哪里得到帧来构造 awaitable 的呢?

promise_type

首先需要知道一个 awaitable_frameawaitable 的关联:前者是后者的 promise_type。这个关系可以在萃取中得知:

namespace std {

template <typename T, typename Executor, typename... Args>
struct coroutine_traits<asio::awaitable<T, Executor>, Args...>
{
  typedef asio::detail::awaitable_frame<T, Executor> promise_type;
};

} // namespace std

接着看实现。awaitable_frame 具有多个特化,依照目前流程挑选 awaitable_frame<void>

// awaitable_frame<T/void>派生于 awaitable_frame_base
// 与 T/void 相关的事务放到 awaitable_frame 类实现
// 其余 promise_type 细节放到 awaitable_frame_base 类实现
template <typename Executor>
class awaitable_frame<void, Executor>
  : public awaitable_frame_base<Executor>
{
public:
  awaitable<void, Executor> get_return_object()
  {
    this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
    // 问题一的答案:promise_type 传递自身给 awaitable
    return awaitable<void, Executor>(this);
  };

  // 如果是 awaitable_frame<T, Executor>特化
  // 那就改为实现 return_value(T)
  void return_void()
  {
  }

  void get()
  {
    this->caller_ = nullptr;
    this->rethrow_exception();
  }
};

类设计就是简单的继承关系,分离 T 和 promise_type 的职责。

下面按照不同的功能对 awaitable_frame_base 进行代码划分。

operator new

先是内存管理的部分:

// 重载了 operator new 和 operator delete 来定制协程帧的动态内存分配
#if !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
  void* operator new(std::size_t size)
  {
    // thread_info_base 的实现粗略看过,大概是优先使用线程本地的内存资源
    return asio::detail::thread_info_base::allocate(
        asio::detail::thread_info_base::awaitable_frame_tag(),
        asio::detail::thread_context::thread_call_stack::top(),
        size);
  }

  void operator delete(void* pointer, std::size_t size)
  {
    asio::detail::thread_info_base::deallocate(
        asio::detail::thread_info_base::awaitable_frame_tag(),
        asio::detail::thread_context::thread_call_stack::top(),
        pointer, size);
  }
#endif // !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)

这部分使用了 Asio 自己的内存池。

promise operations

然后是作为一个承诺类型应该有的基本操作:

  // The frame starts in a suspended state until the awaitable_thread object
  // pumps the stack.
  // 默认是暂停状态,需要 pump() 操作才能唤醒
  auto initial_suspend() noexcept
  {
    return suspend_always();
  }

  // On final suspension the frame is popped from the top of the stack.
  // 最后则弹出自身的帧
  auto final_suspend() noexcept
  {
    struct result
    {
      // 在下方返回时传递 this 构造 this_成员
      awaitable_frame_base* this_;

      bool await_ready() const noexcept
      {
        return false;
      }

      void await_suspend(coroutine_handle<void>) noexcept
      {
        // 出帧操作
        this->this_->pop_frame();
      }

      void await_resume() const noexcept
      {
      }
    };

    return result{this};
  }

  void set_except(std::exception_ptr e) noexcept
  {
    pending_exception_ = e;
  }

  void set_error(const asio::error_code& ec)
  {
    this->set_except(std::make_exception_ptr(asio::system_error(ec)));
  }

  void unhandled_exception()
  {
    set_except(std::current_exception());
  }

  void rethrow_exception()
  {
    if (pending_exception_)
    {
      std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
      std::rethrow_exception(ex);
    }
  }

await_transform

接下来这部分比较重要,用于执行可等待体的转换操作:


  // 众多的 await_transform
  // - 对于 awaitable,原样返回
  // - 对于 executor,返回关联 executor 类型(好神奇的用法)
  // - 对于函数 f,这就稍微复杂了,看下面注释

  template <typename T>
  auto await_transform(awaitable<T, Executor> a) const
  {
    return a;
  }

  // This await transformation obtains the associated executor of the thread of
  // execution.
  auto await_transform(this_coro::executor_t) noexcept
  {
    struct result
    {
      awaitable_frame_base* this_;

      bool await_ready() const noexcept
      {
        return true;
      }

      void await_suspend(coroutine_handle<void>) noexcept
      {
      }

      auto await_resume() const noexcept
      {
        return this_->attached_thread_->get_executor();
      }
    };

    return result{this};
  }

  // This await transformation is used to run an async operation's initiation
  // function object after the coroutine has been suspended. This ensures that
  // immediate resumption of the coroutine in another thread does not cause a
  // race condition.
  // 注释提到,这就是一个异步函数实现转换的重载版本
  // 在这里实现跨线程并不会引起竞争
  template <typename Function>
  auto await_transform(Function f,
      // 既然注释都说了在这里转换,我就不看这一坨萃取了……
      typename enable_if<
        is_convertible<
          typename result_of<Function(awaitable_frame_base*)>::type,
          awaitable_thread<Executor>*
        >::value
      >::type* = 0)
  {
    struct result
    {
      Function function_;
      awaitable_frame_base* this_;

      bool await_ready() const noexcept
      {
        return false;
      }

      void await_suspend(coroutine_handle<void>) noexcept
      {
        // 异步函数与模拟栈实现关联
        // 并且在这里开始执行
        // 划重点,总算动起来了
        function_(this_);
      }

      void await_resume() const noexcept
      {
      }
    };

    return result{std::move(f), this};
  }

帧管理

后续是 frame 和 thread 的管理:

  void attach_thread(awaitable_thread<Executor>* handler) noexcept
  {
    attached_thread_ = handler;
  }

  awaitable_thread<Executor>* detach_thread() noexcept
  {
    return std::exchange(attached_thread_, nullptr);
  }

  // 从作者注释(下面)画的关系图,可以得知模拟的等待帧是在一个 thread 内管理的
  // 一个“thread”内的帧都使用同一 executor,并且循环出栈就能跑起来
  void push_frame(awaitable_frame_base<Executor>* caller) noexcept
  {
    caller_ = caller;
    attached_thread_ = caller_->attached_thread_;
    attached_thread_->top_of_stack_ = this;
    caller_->attached_thread_ = nullptr;
  }

  void pop_frame() noexcept
  {
    if (caller_)
      caller_->attached_thread_ = attached_thread_;
    attached_thread_->top_of_stack_ = caller_;
    attached_thread_ = nullptr;
    caller_ = nullptr;
  }

  void resume()
  {
    coro_.resume();
  }

  void destroy()
  {
    coro_.destroy();
  }

protected:
  coroutine_handle<void> coro_ = nullptr;
  awaitable_thread<Executor>* attached_thread_ = nullptr;
  awaitable_frame_base<Executor>* caller_ = nullptr;
  std::exception_ptr pending_exception_ = nullptr;

整体设计

在类的注释中,Asio 作者给出一张图来说明:

// An awaitable_thread represents a thread-of-execution that is composed of one
// or more "stack frames", with each frame represented by an awaitable_frame.
// All execution occurs in the context of the awaitable_thread's executor. An
// awaitable_thread continues to "pump" the stack frames by repeatedly resuming
// the top stack frame until the stack is empty, or until ownership of the
// stack is transferred to another awaitable_thread object.
//
//                +------------------------------------+
//                | top_of_stack_                      |
//                |                                    V
// +--------------+---+                            +-----------------+
// |                  |                            |                 |
// | awaitable_thread |<---------------------------+ awaitable_frame |
// |                  |           attached_thread_ |                 |
// +--------------+---+           (Set only when   +---+-------------+
//                |               frames are being     |
//                |               actively pumped      | caller_
//                |               by a thread, and     |
//                |               then only for        V
//                |               the top frame.)  +-----------------+
//                |                                |                 |
//                |                                | awaitable_frame |
//                |                                |                 |
//                |                                +---+-------------+
//                |                                    |
//                |                                    | caller_
//                |                                    :
//                |                                    :
//                |                                    |
//                |                                    V
//                |                                +-----------------+
//                | bottom_of_stack_               |                 |
//                +------------------------------->| awaitable_frame |
//                                                 |                 |
//                                                 +-----------------+

这里描述了 awaitable_threadawaitable_frame 的关联。我的理解是作者模拟了线程和栈。awaitable_thread“线程”管理一组协程,而协程都以“栈”的形式压到同一“线程”内。可以想象得到,用户调用 co_await 时则会入栈,而整个“线程”的运作机制就是不断出栈,当然具体运行的上下文仍是 executor 对应的 context,这就与 Asio 原有的机制挂上钩了。

NOTE: 我认为这里需要“线程”的概念是因为 asio::co_spawn() 模拟了 fork()/execve() 原语。

回到 await_frame 的实现上,作为一个承诺类型,可以看到它的 initial_suspend() 是暂停的,需要由管理线程唤醒。final_suspend() 则维护栈结构的出帧操作。

另外一个关键点是 await_transform 转换。有三种重载:

  • 如果是 awaitable,原地返回。
  • 如果 executor_t,将(不暂停并)直接恢复得到关联的 executor
  • 如果是异步函数 f,则在暂停后执行 f(this_await_frame),传递帧指针后执行异步函数

co_spawn()

说到执行,前面提到的异步函数本身是协程(结论见async_result),而且 do_stuff() 也是协程,其流程都是“懒惰”而尚未实际执行的。我们需要一个 kick off 的契机,但目前还缺少 executor 信息。一般来说,上下文运行于 asio::io_context.run() 对应的线程,而要让协程跑起来还需要 asio::co_spawn(executor, awaitable{}, completion_token),可以说这是一个协程开始孵化的地方。现在从头来梳理一遍调用过程。按照文中给的例子,这里 executor 传入 io_context 对象(实际是 context 而非 executor,没关系,Asio 帮你搞定),awaitable{} 传入 do_stuff 协程对应的可等待体 do_stuff(socket)completion_token 设为空置的 asio::detached 表示不关心完成结果。

现在来看 co_spawn() 实现。

// 这里是实现部分,最后一个参数是默认 `=0`(接口分离在另一个文件了,所以函数签名信息不全)
template <typename Executor, typename AwaitableExecutor,
    ASIO_COMPLETION_TOKEN_FOR(
      void(std::exception_ptr)) CompletionToken>
inline ASIO_INITFN_AUTO_RESULT_TYPE(
    CompletionToken, void(std::exception_ptr))
co_spawn(const Executor& ex,
    awaitable<void, AwaitableExecutor> a, CompletionToken&& token,
    typename enable_if<
      (is_executor<Executor>::value || execution::is_executor<Executor>::value)
        && is_convertible<Executor, AwaitableExecutor>::value
    >::type*)
{
  return async_initiate<CompletionToken, void(std::exception_ptr)>(
      detail::initiate_co_spawn<AwaitableExecutor>(AwaitableExecutor(ex)),
      token, detail::awaitable_as_function<
        void, AwaitableExecutor>(std::move(a)));
}

co_spawn() 的实现再次用到 async_initiate()。除了传入 initiate_co_spawn 对象以外,还使用 awaitable_as_functionawaitable 封装为函数。

async_result

前面提到,async_initiate() 具有 2 个重载版本,使用哪个版本取决于 async_result 是否有 initiate() 函数。我们需要找 tokenasio::detached_t 的特化版本。

template <typename Signature>
struct async_result<detached_t, Signature>
{
  // detached_handler 内部全部是空实现,表示不关心处理结果,仅作为一个 handler 适配器
  typedef asio::detail::detached_handler completion_handler_type;

  // 返回 void
  typedef void return_type;

  explicit async_result(completion_handler_type&)
  {
  }

  void get()
  {
  }

#if defined(ASIO_HAS_VARIADIC_TEMPLATES)

  template <typename Initiation, typename RawCompletionToken, typename... Args>
  static return_type initiate(
      ASIO_MOVE_ARG(Initiation) initiation, // 传入 initiate_co_spawn{ex}
      ASIO_MOVE_ARG(RawCompletionToken), // 传入 asio::detached
      ASIO_MOVE_ARG(Args)... args) // 传入 awaitable_as_function{a}
  {
    ASIO_MOVE_CAST(Initiation)(initiation)(
        detail::detached_handler(detached_t()),
        ASIO_MOVE_CAST(Args)(args)...);
  }

#else
  // ...略
#endif
};

由于具有 initate() 函数,因此依然是版本 1。将执行 async_result::initate(),展开为 initiate_co_spawn{ex}.operator()(detached, awaitable_as_function{a})

注意和 use_awaitable 版本的异步函数对应的 initiate() 有所不同,它并非协程:跳回上面看看

awaitable_as_function

顺便补充一下 awaitable_as_function,是一个将可等待体伪造成仿函数的类。

template <typename T, typename Executor>
class awaitable_as_function
{
public:
  explicit awaitable_as_function(awaitable<T, Executor>&& a)
    : awaitable_(std::move(a))
  {
  }

  // 执行 () 就能还原回 awaitable
  awaitable<T, Executor> operator()()
  {
    return std::move(awaitable_);
  }

private:
  awaitable<T, Executor> awaitable_;
};

initiate_co_spawn

template <typename Executor>
class initiate_co_spawn
{
public:
  typedef Executor executor_type;

  template <typename OtherExecutor>
  explicit initiate_co_spawn(const OtherExecutor& ex)
    : ex_(ex)
  {
  }

  executor_type get_executor() const ASIO_NOEXCEPT
  {
    return ex_;
  }

  template <typename Handler, typename F>
  void operator()(Handler&& handler, F&& f) const
  {
    typedef typename result_of<F()>::type awaitable_type;

    auto a = (co_spawn_entry_point)(static_cast<awaitable_type*>(nullptr),
        ex_, std::forward<F>(f), std::forward<Handler>(handler));
    awaitable_handler<executor_type, void>(std::move(a), ex_).launch();
  }

private:
  Executor ex_;
};

如上述 initiate_co_spawn 代码,operator()(detached, awaitable_as_function{a2}) 展开后:

  • 先执行 a = co_spawn_entry_point(nullptr, awaitable_as_function{a2}, detached) 得到 awaitable a
  • 再执行 awaitable_handler(a, ex).launch()

NOTE: 这里 a2 指的是 co_spawn() 传递的用户协程生成的可等待体。

co_spawn_entry_point()

template <typename Executor, typename F, typename Handler>
awaitable<void, Executor> co_spawn_entry_point(
    awaitable<void, Executor>*, Executor ex, F f, Handler handler)
{
  // 构造 work guard
  // make_co_spawn_work_guard() 这部分代码和 unified executors 关联性较大
  // 为 ex 尝试添加 execution::outstanding_work_t::tracked_t 的 property
  // 表示将来会有任务提交,避免此时 executor(比如 io context)关闭
  auto spawn_work = make_co_spawn_work_guard(ex);
  auto handler_work = make_co_spawn_work_guard(
      asio::get_associated_executor(handler, ex));

  // asio::post(ex, token) 是 execution 接口
  // 差不多就是添加一些 property 然后 execute
  // 总之,用目前的示例就是转发到 io_context.execute(ex, use_awaitable)
  //
  // 那么问题来了,这里调用 asio::post() 将返回什么类型?co_await ???
  // 我简单剧透,还是 awaitable
  (void) co_await (post)(spawn_work.get_executor(),
      use_awaitable_t<Executor>{__FILE__, __LINE__, "co_spawn_entry_point"});

  std::exception_ptr e = nullptr;
  try
  {
    // 实际执行点
    co_await f();
  }
  catch (...)
  {
    e = std::current_exception();
  }

  (dispatch)(handler_work.get_executor(),
      [handler = std::move(handler), e]() mutable
      {
        handler(e);
      });
}

co_spawn_entry_point() 是运行时第一个调用到的协程,它的实现有点复杂:

  • 先是 co_await post(ex, use_awaitable)
  • 然后是 co_await f()
  • 最后是 work guard handler 收尾。

好吧,虽然我们在聊 co_spawn(),但严格点说 do_stuff() 函数才是第一个调用到的协程。

由于 initial_suspend() 的存在,co_spawn_entry_point() 并没有执行,而是直接暂停而返回给上层 initiate_co_spawn::operator()

launch()

回到上层,接下来就执行 awaitable_handler(a, ex).launch()

launch() 前,可以先回顾一下 awaitable_handler 的构造。它是经由 awaitable_handler -> awaitable_handler_base -> awaitable_thread 的委托构造而得到的对象。此时 a 作为 bottom_of_stack_ 成员而被 awaitable_handler 持有;top_of_stack_ 成员指向 a.frame_

这里有个细节是 awaitable.frame_ 是怎么得到的问题,这玩意默认构造可是空指针啊。解答见 promise_type 小节。作为承诺类型的 awaitable_frame 在协程调用前即被隐式 operator new 分配;当你在一个协程返回 awaitable 前,awaitable_frame 调用 get_return_object() 保证传递 thisawaitable。而 awaitable 的实际对象 a 是一个已暂停(协程返回)过的可等待体,因此必然有等待帧。

  // Launch a new thread of execution.
  // awaitable_handler 执行
  void launch()
  {
    top_of_stack_->attach_thread(this);
    pump();
  }

  // awaitable_thread<Executor>* top_of_stack_执行
  void attach_thread(awaitable_thread<Executor>* handler) noexcept
  {
    attached_thread_ = handler;
  }

  // Repeatedly resume the top stack frame until the stack is empty or until it
  // has been transferred to another resumable_thread object.
  // awaitable_handler 执行
  void pump()
  {
    // 唤醒
    do top_of_stack_->resume(); while (top_of_stack_);
    if (bottom_of_stack_.valid())
    {
      awaitable<void, Executor> a(std::move(bottom_of_stack_));
      a.frame_->rethrow_exception();
    }
  }

显然,这是整体设计里提到的出栈处理。在上一节,co_spawn_entry_point() 因被暂停而压帧(push_frame())。现在位于 top_of_stack_,因此通过 resume() 唤醒,恢复执行。

恢复后的 co_spawn_entry_point() 先构造了 spawn_workhandler_work,然后实际执行 co_await post(use_awaitable) 表达式。

post()

co_await post(ex, use_awaitable) 先执行了 post() 操作,然后对其返回值进行 co_await 操作。

// 这是 post 接口
template <typename Executor,
    ASIO_COMPLETION_TOKEN_FOR(void()) CompletionToken>
ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void()) post(
    const Executor& ex, ASIO_MOVE_ARG(CompletionToken) token,
    typename enable_if<
      execution::is_executor<Executor>::value || is_executor<Executor>::value
    >::type*)
{
  // 再一次,async_initiate
  // token 传入 use_awaitable
  // 从前面章节的 async_result<use_awaitable_t>可知,这里返回一个 awaitable
  return async_initiate<CompletionToken, void()>(
      detail::initiate_post_with_executor<Executor>(ex), token);
}

再一次看到了 async_initiate,而 token 则传入 use_awaitable,从前面的 async_result 章节 可知,将返回一个 awaitable。因此,asio::post(use_awaitable) 重载版本的操作也是协程化的。

我们解决了上面留下来的问题,co_await (post)(..., use_awaitable) 仍是 co_await awaitable{...}。万幸没有更加复杂的转换……

虽然 post() 作为协程是存在暂停现象,但是前面 launch() 函数的 pump() 操作又会将它唤醒,继续往下看展开后的操作。

暂停是因为 async_result<use_awaitable_t>::initiate() 是协程。

kick off

当尝试调用 async_result<use_awaitable_t>::initiate(),需要注意这一块会关联到前面 async_result 的行为。为了方便再贴一次代码:

  template <typename Initiation, typename... InitArgs>
  static return_type initiate(Initiation initiation,
      use_awaitable_t<Executor> u, InitArgs... args)
  {
    (void)u;

    co_await [&](auto* frame)
      {
        ASIO_HANDLER_LOCATION((u.file_name_, u.line_, u.function_name_));
        handler_type handler(frame->detach_thread());
        // 实际是在这里处理初始化函数
        std::move(initiation)(std::move(handler), std::move(args)...);
        return static_cast<handler_type*>(nullptr);
      };

    for (;;) {} // Never reached.
  }

async_result::initiate() 函数是一个协程,其内部操作有 co_await lambda 的行为。根据承诺对象的转换操作,这里对应 await_transform(f) 的重载:

  • 先暂停(await_ready() 返回 false)。
  • 暂停后执行 await_suspend() 操作,传递 this_(等待帧)到 lambda(frame = this_)
  • 构造 awaitable_handler 类型的 handler
  • 执行 initiate_post_with_executor{}.operator()(handler)

initiate_post_with_executor

现在看下最后一步,即 initiate_post_with_executor{}.operator()(handler) 的流程:

template <typename Executor>
class initiate_post_with_executor
{
public:
  typedef Executor executor_type;

  explicit initiate_post_with_executor(const Executor& ex)
    : ex_(ex)
  {
  }

  executor_type get_executor() const ASIO_NOEXCEPT
  {
    return ex_;
  }

  // 传入 awaitable_handler
  template <typename CompletionHandler>
  void operator()(ASIO_MOVE_ARG(CompletionHandler) handler,
      typename enable_if<
        execution::is_executor<
          typename conditional<true, executor_type, CompletionHandler>::type
        >::value
        &&
        // 没心思看萃取,重载版本太多,看这个实现吧,大同小异
        detail::is_work_dispatcher_required<
          typename decay<CompletionHandler>::type,
          Executor
        >::value
      >::type* = 0) const
  {
    typedef typename decay<CompletionHandler>::type handler_t;

    typedef typename associated_executor<
      handler_t, Executor>::type handler_ex_t;
    handler_ex_t handler_ex((get_associated_executor)(handler, ex_));

    typename associated_allocator<handler_t>::type alloc(
        (get_associated_allocator)(handler));

    // 简单来说,就是 ex.execute(handler);
    // 这里使得上下文切换到 ex(比如 io_context)的上下文去执行 handler()
    // 因此需要看实际的 awaitable_handler 的 operator() 要做什么
    execution::execute(
        asio::prefer(
          asio::require(ex_, execution::blocking.never),
          execution::relationship.fork,
          execution::allocator(alloc)),
        detail::work_dispatcher<handler_t, handler_ex_t>(
          ASIO_MOVE_CAST(CompletionHandler)(handler), handler_ex));
  }

  // ...省略大量 operator() 重载

private:
  Executor ex_;
};

现在可以得知,post(io_context) 操作将 awaitable_handler 投递到 io_context 上下文,投递完成后 co_await 暂停,实现了上下文的切换

那么接下来呢?接下来 co_spawn() 已经结束流程了,因为返回 launch() 上层后 pump() 也不会再次唤醒任何协程。要问为什么还得看回 co_await lambda 倒数第二步。

context switch

co_await lambda 过程需要注意 handler 的构造函数以及 detach_thread 行为。(跳回 awaitable_handler 小节

  // 为了方便再贴一次代码
  // Transfer ownership from another awaitable_thread.
  explicit awaitable_handler_base(awaitable_thread<Executor>* h)
    : awaitable_thread<Executor>(std::move(*h))
  {
  }

这里 handler 的构造使用了 std::move(),引起了“线程”所有权的转移。被偷家的“线程”pump() 过程所用到的“栈”已经置空。

  void pump()
  {
    // 唤醒?不存在的
    do top_of_stack_->resume(); while (top_of_stack_);
    if (bottom_of_stack_.valid())
    {
      awaitable<void, Executor> a(std::move(bottom_of_stack_));
      a.frame_->rethrow_exception();
    }
  }

当执行完 post() 后,co_await post() 将使得协程 co_spawn_entry_point() 暂停。此后本地上下文再也不会恢复该协程。

  // 其中某一特化 awaitable_handler::operator() 如下
  // 注意从前面的 caller 来看,这是一个异步的过程
  // 因为我们跟踪的流程是本地上下文执行的 co_spawn(io_context, a, detached)
  // 而这个是在 io_context 上下文执行的,现在只是投递一个事件,还没有实际运行
  // 等到 io_context.run() 才调用到这个 handler
  template <typename... Args>
  void operator()(Args&&... args)
  {
    this->frame()->attach_thread(this);
    this->frame()->return_values(std::forward<Args>(args)...);
    this->frame()->pop_frame();
    this->pump();
  }

而上下文将被切换到 executor 所指定的上下文。比如 io_context.run(),使得 awaitable_handler{}() 被执行,此前暂停的协程可以再次恢复,并且运行所在的上下文已经切换到 io_context

再接下来,恢复的 co_spawn_entry_point() 则执行 co_await f(),这里就是对应的 do_stuff() 用户层接口,真正执行异步函数。

总结

简单的做点总结吧:

  • Asio 通过 async_result 萃取和 use_awaitable 令牌来完成异步函数的协程化重载。
  • Asio 划分协程组为模拟线程(awaitable thread),组内协程的调用关系模拟为栈结构。
  • Asio 使用 co_spawn() 创建模拟线程和投递任务,并通过模拟线程迁移完成上下文切换。
  • Asio 很抽象,我还远远跟不上作者的思路。

图文无关

按照惯例,文末随机附有内容无关的图。

cocoa Cocoa is cute!

References

Coroutines – cppreference
C++ Coroutines: Understanding operator co_await – Asymmetric Transfer
C++20 Coroutines Support – asio C++ library

This post is licensed under CC BY 4.0 by the author.
Contents