Home 从C++20协程,到Asio的协程适配
Post
Cancel

从C++20协程,到Asio的协程适配

前言

上文已经解析了Asio早期的无栈协程实现,本文继续探讨Asio对于C++20协程的适配。

另外,我在知乎翻译了一篇文章,是关于Asio的接口演进历史以及现代网络编程,推荐阅读本文Asio协程章节前先热身:为什么C++20是最awesome的网络编程语言 [中国翻訳]

C++20协程

翻源码也是为了加深对C++20协程的理解,还请允许本弱鸡先啰嗦几句C++20协程标准的设计。

不想听老ass念经?点击跳过!

基本介绍

C++20协程并不是完全的编译器实现,也不是完全的库实现。从类设计上分类,可以分为三大块:

  • 协程描述符(coroutine handle),用于外部接口的交互。
  • 承诺类型(promise type),用于内部的库实现设计。
  • 协程帧(coroutine frame / coroutine state),由编译器处理的上下文信息。

当然除了类以外,还有函数本身。在C++标准中,协程是函数:A coroutine is a function that can suspend execution to be resumed later.

coroutine 不知用了多少遍的好图

先说协程描述符std::coroutine_handle<>,它作为一个描述符的作用就是调用接口。比如resume()destroy(),一个已暂停的协程接下来能做的事情只有恢复或销毁;还有观测用接口done()和访问关联的承诺类型promise()等等。总之是个容易理解的接口类型。

承诺类型promise_type可以定制协程的具体行为以及处理值的传递。它可以通过initial_suspend()final_suspend()定制是否要在特殊的暂停点(suspension point)执行暂停操作;也可以通过get_return_object()得到首次暂停时得到的对象;以及yield_value(v)可接收co_yield v产出(yield,表示暂停并返回一个值)的值v

至于协程帧则是由编译器控制生成的部分。我们在聊协程时之所以完全不关心所谓上下文保存,那是因为这部分交给编译器管理了,一般来说会隐式的调用动态内存分配operator new来存储上下文信息,但是也会尽可能优化而无需实际分配。

执行流程

实现一个协程hello world程序还需要了解承诺类型的get_return_object(),协程描述符的静态函数from_promise(),以及协程函数的的返回值之间的关联。先看用例吧:

// 一个懒惰的生成器,生成[0, n)序列,每调用一次生成一个值
static My_generator<int> coroutine_function(size_t n) {
    // 尽管这里看着像是无限生成序列……
    for(auto i : std::views::iota(0)) {
        co_yield i;
    }
}

int main() {
    constexpr size_t n = 7;
    auto generate = coroutine_function(n);
    // 这里使用定制的operator bool和operator ()
    while(generate) {
        std::cout << generate() << ' ';
    }
    return 0;
}

// 输出:
0 1 2 3 4 5 6

假设实现一个My_generator,这里的协程该怎么工作?可以简单认为:

  1. 编译器使用operator new分配协程帧对象。
  2. 如果有函数参数,则拷贝/移动到协程帧中,引用不需处理。
  3. 调用承诺对象promise的构造函数,其参数使用协程中的传参(在这里为n)。
  4. 调用promise.get_return_object(),后续作为返回对象给上层(在这里就是generate对象)。
  5. 协程是否暂停取决于promise.initial_suspend(),一般如果返回类型是std::suspend_always,立刻回到上层调用者。
  6. 后续调用再执行函数体。
  7. 每一次调用generate()(内部实际为coroutine_handle.resume()封装),已暂停的coroutine_function()会恢复现场,后续在co_yield处暂停并返回i
  8. 调用promise.yield_value(...),传参即为上一步返回(产出)的值。
  9. 由用户在yield_value()函数体内实现promise内部存放临时存储,回到上层generate(),由用户获取存放在promise的值。
  10. generate()返回值给上层(即main())。

NOTES:

  • 协程的返回类型your_return_type务必有typename your_return_type::promise_type,这也是第3步时编译器得知对应promise_type的原因(或者使用std::coroutine_traits)。
  • 第4步如果需要从中获取协程描述符,那就在承诺对象的成员函数内使用coruntine_handle<decltype(promise)>::from_promise(*this)
  • 当遇到暂停点时,先前获得的对象将返回,因此第4步get_return_object()得到的对象返回。
  • 第5步其实是co_await promise.initial_suspend(),具体后面再说。

一个完整的示例如下:

#include <coroutine>
#include <iostream>
#include <ranges>

template <typename T>
class My_generator {
public:
    struct promise_type;

    // ctor / dtor
    explicit constexpr
      My_generator(std::coroutine_handle<promise_type> h) noexcept
        : _handle(h) {}
    ~My_generator() { _handle ? _handle.destroy() : void(0); }

    // copy / move
    My_generator(const My_generator &) = delete;
    My_generator(My_generator &&rhs) = delete;
    My_generator& operator=(const My_generator &rhs) = delete;

    // user APIs
    T operator ()() {
        if(!resumable()) {
            throw std::runtime_error("AIEEEEE! A NINJA!?");
        }
        if(!operator bool()) {
            throw std::logic_error("WHY THERE'S A NINJA HERE!?");
        }
        _handle.resume();
        _handle.promise().count--;
        return std::move(_handle.promise().result);
    }

    explicit operator bool() const noexcept {
        return _handle.promise().count > 0;
    }

    bool resumable() const noexcept {
        return _handle && !_handle.done();
    }

private:
    std::coroutine_handle<promise_type> _handle;
};

template <typename T>
struct My_generator<T>::promise_type {
    explicit constexpr
      promise_type(size_t count) noexcept(noexcept(T())): count(count) {}
    auto initial_suspend() noexcept { return std::suspend_always{}; }
    auto final_suspend() noexcept { return std::suspend_never{}; }
    void unhandled_exception() {}

    My_generator<T> get_return_object();
    auto yield_value(auto &&value);

    size_t count;
    T result;
};

template <typename T>
My_generator<T> My_generator<T>::promise_type::get_return_object() {
    auto handle = std::coroutine_handle<promise_type>::from_promise(*this);
    return My_generator{handle};
}

template <typename T>
auto My_generator<T>::promise_type::yield_value(auto &&value) {
    result = std::forward<decltype(value)>(value);
    return std::suspend_always{};
}

// C++ coroutines are special functions
static My_generator<int> coroutine_function(size_t n) {
    for(auto i : std::views::iota(0)) {
        co_yield i;
    }
}

int main() {
    constexpr size_t n = 7;
    auto generate = coroutine_function(n);
    while(generate) {
        std::cout << generate() << ' ';
    }
    return 0;
}

Awaiter

实际上,co_yield expr等价于co_await promise.yield_value(expr)co_await是一个暂停点,也就是说等待并返回控制权给上层的操作由它来完成。

我们定义两种不同的概念用于co_await的定制:

  • 可等待体(awaitable):允许被co_await接收的表达式。
  • 等待器(awaiter):实现了await_ready() / await_suspend() / await_resume()的对象。

当可等待体awaitable被调用co_await时,编译器可以对可等待体执行转换:

  • 如果此时暂停点为initial_suspend()或者final_suspend(),则没有变动。
  • 如果此时暂停点实际使用co_yield,则没有变动。
  • 如果承诺对象promise存在函数await_transform(awaitable),那可等待体可以转换为await_transform(awaitable)

当确认可等待体后,还需要继续通过重载以确认等待器:

  • 通过成员函数awaitable.operator co_await()确认。
  • 通过非成员函数operator co_await(awaitable)确认。
  • 如果没有重载,那么可等待体即是等待器。

这些复杂的概念哪里会用到?我们一直在用。前面例子中的co_yield promise.yield_value(expr)实际展开后,(除了yield_value()的函数执行以外)就是co_await std::suspend_always{}

  // libstdc++实现
  struct suspend_always
  {
    constexpr bool await_ready() const noexcept { return false; }

    constexpr void await_suspend(coroutine_handle<>) const noexcept {}

    constexpr void await_resume() const noexcept {}
  };

  struct suspend_never
  {
    constexpr bool await_ready() const noexcept { return true; }

    constexpr void await_suspend(coroutine_handle<>) const noexcept {}

    constexpr void await_resume() const noexcept {}
  };

由于没有上述的转换和重载,可以得知std::suspend_always既是可等待体,也是等待器。

等待器的基本实现思路如下:

  • 假设协程于<suspend-point>表示实际的暂停点,于<resume-point>表示实际的恢复点。
  • bool await_ready()表示是否到达<suspend-point>
    • 返回false进入<suspend-point>
    • 返回true进入<resume-point>
  • auto await_suspend(coroutine_handle)表示已到达<suspend-point>后的执行函数:
    • 如果返回voidtrue,表示保持暂停而不进入<resume-point>
    • 如果返回false,将进入当前协程的<resume-point>
    • 如果返回某个协程描述符,将进入对应协程的<resume-point>
  • auto await_resume()表示已到达<resume-point>后的执行函数:
    • 其返回值就是co_await最终得到的返回结果。

不难看出,等待器的三个接口是基于事件处理来设计的:

  • await_ready()表示事件是否需要已完成,完成则直接收集,否则需要处理再收集。
  • await_suspend()表示处理事件的方式,可采用同步或者异步。
  • await_resume()表示收集事件的结果。

NOTES:

  • <suspend-point><resume-point>是由编译器生成的。
  • 不进入<resume-point>意味着控制权返回给上层
  • await_suspend(coroutine_handle)如果返回任意协程描述符h,等同于调用h.resume()
  • await_suspend(coroutine_handle)可以返回自身关联的coroutine_handle
  • 到达<suspend-point>后可以.resume(),因此await_suspend()返回前甚至可以.resume()自身。
  • 关于上一条NOTE需要注意协程帧与*this的生命周期差异,这份代码会教你做事。

Lewis Baker的文章给了一份很好的伪代码作为解释:

// So, assuming we have encapsulated the logic for turning the <expr> result
// into an Awaiter object into the above functions then the semantics of
// co_await <expr> can be translated (roughly) as follows:
{
  auto&& value = <expr>;
  auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
  auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>(awaitable));
  if (!awaiter.await_ready())
  {
    using handle_t = std::experimental::coroutine_handle<P>;

    using await_suspend_result_t =
      decltype(awaiter.await_suspend(handle_t::from_promise(p)));

    <suspend-coroutine>

    if constexpr (std::is_void_v<await_suspend_result_t>)
    {
      awaiter.await_suspend(handle_t::from_promise(p));
      <return-to-caller-or-resumer>
    }
    else
    {
      // 这里有些过时了,handle也是可以的,总之问题不大
      static_assert(
         std::is_same_v<await_suspend_result_t, bool>,
         "await_suspend() must return 'void' or 'bool'.");

      if (awaiter.await_suspend(handle_t::from_promise(p)))
      {
        <return-to-caller-or-resumer>
      }
    }

    <resume-point>
  }

  return awaiter.await_resume();
}

你也可以动手改造文中给的示例,来理解等待器的行为:

template <typename T>
 auto My_generator<T>::promise_type::yield_value(auto &&value) {
     result = std::forward<decltype(value)>(value);
-    return std::suspend_always{};
+    //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+    struct Awaiter {
+        bool await_ready() {
+            std::cout << "ready?" << std::endl;
+            return false;
+        }
+        auto await_suspend(std::coroutine_handle<> handle) {
+            std::cout << "suspend!" << std::endl;
+            std::cout << handle.address() << std::endl;
+            return void();
+            // return false;
+            // return true;
+            // return /*next_handle*/;
+        }
+        void await_resume() {
+            std::cout << "resume!" << std::endl;
+        }
+    };
+    return Awaiter{};
+    //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 }

返回语句

Q: 我已经明白co_yieldco_await了,那还有co_return呢?

A: 不想写了,自己品鉴吧。

When a coroutine reaches the co_return statement, it performs the following:

  • calls promise.return_void() for
    • co_return;
    • co_return expr; where expr has type void
  • or calls promise.return_value(expr) for co_return expr; where expr has non-void type
  • destroys all variables with automatic storage duration in reverse order they were created.
  • calls promise.final_suspend() and co_awaits the result.

When the coroutine state is destroyed either because it terminated via co_return or uncaught exception, or because it was destroyed via its handle, it does the following:

  • calls the destructor of the promise object.
  • calls the destructors of the function parameter copies.
  • calls operator delete to free the memory used by the coroutine state.
  • transfers execution back to the caller/resumer.

Asio协程

众所周知,Asio出于扩展性和兼容性的需求,代码是比较抽象的。本章按照一个实际调用的过程来探讨对于异步函数的C++20协程适配,非协程相关的内容会被忽略,这样可以减少阅读上的复杂度。

实际上是按照深度优先的顺序去阅读的,而不是先给设计然后解释的顺序。又因为异步的关系,往往需要边看边往上翻。

异步函数

async_read_some()为例,Asio可按如下方式使用协程:

asio::awaitable<void> do_stuff(asio::ip::tcp::socket socket) {
    char buf[1024];
    for(;;) {
        size_t n = co_await socket.async_read_some(asio::buffer(buf), asio::use_awaitable);
        // ...
    }
}

async operation

现在跟踪async_read_some()的函数签名:

  template <typename MutableBufferSequence,
      ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code,
        std::size_t)) ReadHandler
          ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler,
      void (asio::error_code, std::size_t))
  async_read_some(const MutableBufferSequence& buffers,
      ASIO_MOVE_ARG(ReadHandler) handler
        ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
  {
    return async_initiate<ReadHandler,
      void (asio::error_code, std::size_t)>(
        initiate_async_receive(this), handler,
        buffers, socket_base::message_flags(0));
  }

async_read_some()内部实现分为两步:

  • 先是构造并传入了initiate_async_receive(this)对象。
  • 然后调用async_initiate()函数。

(这里handlerasio::use_awaitable

先跟踪initiate_async_receive类型:

  class initiate_async_receive
  {
  public:
    typedef Executor executor_type;

    explicit initiate_async_receive(basic_stream_socket* self)
      : self_(self)
    {
    }

    const executor_type& get_executor() const noexcept
    {
      return self_->get_executor();
    }

    template <typename ReadHandler, typename MutableBufferSequence>
    void operator()(ReadHandler&& handler,
        const MutableBufferSequence& buffers,
        socket_base::message_flags flags) const
    {
      // If you get an error on the following line it means that your handler
      // does not meet the documented type requirements for a ReadHandler.
      ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;

      detail::non_const_lvalue<ReadHandler> handler2(handler);
      self_->impl_.get_service().async_receive(
          self_->impl_.get_implementation(), buffers, flags,
          handler2.value, self_->impl_.get_executor());
    }

  private:
    basic_stream_socket* self_;
  };

主要操作集中在operator(),而构造函数并没有特殊的副作用,因此先不关注。

async_initiate()

接下来再分析async_initiate()函数。该函数存在2个SFINAE版本,用以区分async_result有没有成员函数。这里用版本1和版本2表示:

// async_initiate()版本1
// 调用async_result<>::initiate()
// SFINAE依据async_result_has_initiate_memfn选择
// <del>写的什么玩意,崩的要死</del>
template <typename CompletionToken,
    ASIO_COMPLETION_SIGNATURE Signature,
    typename Initiation, typename... Args>
inline typename enable_if<
    detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
    ASIO_INITFN_DEDUCED_RESULT_TYPE(CompletionToken, Signature,
      (async_result<typename decay<CompletionToken>::type,
        Signature>::initiate(declval<ASIO_MOVE_ARG(Initiation)>(),
          declval<ASIO_MOVE_ARG(CompletionToken)>(),
          declval<ASIO_MOVE_ARG(Args)>()...)))>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
    ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
    ASIO_MOVE_ARG(Args)... args)
{
  return async_result<typename decay<CompletionToken>::type,
    Signature>::initiate(ASIO_MOVE_CAST(Initiation)(initiation),
      ASIO_MOVE_CAST(CompletionToken)(token),
      ASIO_MOVE_CAST(Args)(args)...);
}

版本1就是挑选async_result::initate(initiation, token, args...)

// async_initiate()版本2
// 调用initiation(), async_completion<>.result.get();
template <typename CompletionToken,
    ASIO_COMPLETION_SIGNATURE Signature,
    typename Initiation, typename... Args>
inline typename enable_if<
    !detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
    ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
    ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
    ASIO_MOVE_ARG(Args)... args)
{
  async_completion<CompletionToken, Signature> completion(token);

  ASIO_MOVE_CAST(Initiation)(initiation)(
      ASIO_MOVE_CAST(ASIO_HANDLER_TYPE(CompletionToken,
        Signature))(completion.completion_handler),
      ASIO_MOVE_CAST(Args)(args)...);

  return completion.result.get();
}

版本2需要引入async_completion类,主要是async_completion配合token从中推导出completion_handler

调用过程为:

  • 通过token构造async_completion,该对象提供handler
  • 执行initiation对象的operator()(handler, args...)
  • 收集result

至于版本是怎么区分的就看这一堆萃取:

template <typename CompletionToken,
    ASIO_COMPLETION_SIGNATURE... Signatures>
struct async_result_has_initiate_memfn
  // 根据sizeof(async_result_initiate_memfn_helper)大小来判断
  // > 1就有成员函数, == 1没有
  : integral_constant<bool, sizeof(async_result_initiate_memfn_helper<
      async_result<decay_t<CompletionToken>, Signatures...>
    >(0)) != 1>
{
};

// 而async_result_initiate_memfn_helper有2个版本

// 不具有实现的函数签名,对应大小为1
template <typename>
char (&async_result_initiate_memfn_helper(...))[2];

// 如果有实现,那就是16字节
// T为async_result<>
template <typename T>
char async_result_initiate_memfn_helper(
    async_result_memfns_check<
      void (async_result_memfns_base::*)(),
      &async_result_memfns_derived<T>::initiate>*);


// T为void (async_result_memfns_base::*)()
// 传入&async_result_memfns_derived<T>::initiate>*
template <typename T, T>
struct async_result_memfns_check
{
};

// 对类型{T, async_result_memfns_base}构成mixin
// 此时具有了initiate()成员函数
template <typename T>
struct async_result_memfns_derived
  : T, async_result_memfns_base
{
};

// 因此就是看是否有async_result_memfns_derived<async_result>::initiate()成员函数
// 而这个成员函数是通过async_result::initiate()的继承得到的
// 显然在这个async_read_some()流程中对应的async_result<>特化是具有initiate()成员函数的
// (为啥写这么绕呢?也许这就是Asio吧……)

async_result

async_result async_result是一个定制点:决定返回类型和具体完成令牌

目前分析得到,流程会调用到async_result<CompletionToken, Signature>::initiate()。而这里对应的完成令牌(completion token)即为asio::use_awaitable_t,依此可以找到特化模板:

template <typename Executor, typename R, typename... Args>
class async_result<use_awaitable_t<Executor>, R(Args...)>
{
public:
  typedef typename detail::awaitable_handler<
      Executor, typename decay<Args>::type...> handler_type;
  typedef typename handler_type::awaitable_type return_type;

  // 调用到的initiate()函数
  // 注意到这是一个协程,因此要关注return_type是怎样的类设计
  //
  // 另外,initiation就是原地构造的initiate_async_receive(socket)对象
  template <typename Initiation, typename... InitArgs>
  static return_type initiate(Initiation initiation,
      use_awaitable_t<Executor> u, InitArgs... args)
  {
    (void)u;

    co_await [&](auto* frame)
      {
        ASIO_HANDLER_LOCATION((u.file_name_, u.line_, u.function_name_));
        handler_type handler(frame->detach_thread());
        // 实际是在这里处理初始化函数
        std::move(initiation)(std::move(handler), std::move(args)...);
        return static_cast<handler_type*>(nullptr);
      };

    for (;;) {} // Never reached.
#if defined(_MSC_VER)
    co_return dummy_return<typename return_type::value_type>();
#endif // defined(_MSC_VER)
  }
};

async_result<use_awaitable_t>::initiate()函数使用了co_await。根据定义,这是一个协程。对于协程我们需要关注其返回类型return_type的推导。

  // return_type即handler_type::awaitable_type
  // 那么handler_type又是啥
  typedef typename handler_type::awaitable_type return_type;


  // handler_type就是asio::detail::awaitable_handler
  typedef typename detail::awaitable_handler<
      Executor, typename decay<Args>::type...> handler_type;

小结

现在先阶段性小结吧。当我们以协程的方式调用一个异步函数时:

asio::awaitable<void> do_stuff_pseudo(auto io_object) {
    auto resume_result = co_await io_object.async_operation(..., asio::use_awaitable);
}
  • 实际是以asio::awaitable作为返回类型的协程的前提下。
  • 执行co_await asio::detail::awaitable_handler::awaitable_type{}
  • 而异步函数则有initiation = initiate_do_sth{io_object_ptr}对象在内部传递。

这里假设initiate_do_sth是某个类,该类实现了operator(),通过构造函数得到initaiton对象。

需要注意的是,从do_stuff_pseudo函数开始,就已经是协程了。Asio设置的initial_suspend()选择返回suspend_always。因此本地(当前运行的)上下文的调用必然是直接暂停返回的。

作为小结,这里给出一个大概的内联展开调用关系:

// 本地上下文,直接暂停
do_stuff_pseudo()
  ...
  io_object.async_operation()
    return async_initiate(initiation = initiate_do_sth{io_object_ptr})
      // 内部存在co_await,这也是一个协程
      return async_result::initiate(initiation, token, ...)
        co_await [](frame) { /* 本该在这里展开initiation(...) */ }
        for(;;)

-------------------------------------------------------------------

// 其它上下文,醒来后发现自己跑到别的地方去了
// 熟悉Asio的同学也应该猜到是由executor给出的上下文
initiation(handler = deduced_from(async_result<token>), ...)

剩下的问题核心就在于awaitable_type别名:先剧透一下,毫无意外就是asio::awaitable

asio::awaitable

上面分析到协程的返回类型实际是asio::detail::awaitable_handler::awaitable_type

逐一解析,先从awaitable_handler看起吧。这里开始将涉及到若干协程相关数据结构:

  • awaitable_handler.
  • awaitable_handler_base.
  • awaitable_thread.
  • awaitable_frame.
  • awaitable_frame_base.
  • awaitable.

awaitable_handler

// awaitable_handler派生于awaitable_handler_base
// 核心就是多了operator()作为handler
template <typename Executor, typename... Ts>
class awaitable_handler
  : public awaitable_handler_base<Executor, std::tuple<Ts...>>
{
public:
  using awaitable_handler_base<Executor,
    std::tuple<Ts...>>::awaitable_handler_base;

  template <typename... Args>
  void operator()(Args&&... args)
  {
    this->frame()->attach_thread(this);
    this->frame()->return_values(std::forward<Args>(args)...);
    this->frame()->pop_frame();
    this->pump();
  }
};

// awaitable_handler_base派生于awaitable_thread
// 这里看到awaitable_type就是asio::awaitable
template <typename Executor, typename T>
class awaitable_handler_base
  : public awaitable_thread<Executor>
{
public:
  typedef void result_type;
  // 找到你了
  typedef awaitable<T, Executor> awaitable_type;

  // Construct from the entry point of a new thread of execution.
  awaitable_handler_base(awaitable<void, Executor> a, const Executor& ex)
    : awaitable_thread<Executor>(std::move(a), ex)
  {
  }

  // Transfer ownership from another awaitable_thread.
  explicit awaitable_handler_base(awaitable_thread<Executor>* h)
    : awaitable_thread<Executor>(std::move(*h))
  {
  }

protected:
  awaitable_frame<T, Executor>* frame() noexcept
  {
    return static_cast<awaitable_frame<T, Executor>*>(this->top_of_stack_);
  }
};

awaitable_thread

// awaitable_handler的基类,实际管理awaitable和awaitable_frame
template <typename Executor>
class awaitable_thread
{
public:
  typedef Executor executor_type;

  // Construct from the entry point of a new thread of execution.
  awaitable_thread(awaitable<void, Executor> p, const Executor& ex)
    : bottom_of_stack_(std::move(p)),
      top_of_stack_(bottom_of_stack_.frame_),
      executor_(ex)
  {
  }

  // Transfer ownership from another awaitable_thread.
  awaitable_thread(awaitable_thread&& other) noexcept
    : bottom_of_stack_(std::move(other.bottom_of_stack_)),
      top_of_stack_(std::exchange(other.top_of_stack_, nullptr)),
      executor_(std::move(other.executor_))
  {
  }

  // Clean up with a last ditch effort to ensure the thread is unwound within
  // the context of the executor.
  ~awaitable_thread()
  {
    if (bottom_of_stack_.valid())
    {
      // Coroutine "stack unwinding" must be performed through the executor.
      (post)(executor_,
          [a = std::move(bottom_of_stack_)]() mutable
          {
            awaitable<void, Executor>(std::move(a));
          });
    }
  }

  executor_type get_executor() const noexcept
  {
    return executor_;
  }

  // Launch a new thread of execution.
  void launch()
  {
    top_of_stack_->attach_thread(this);
    pump();
  }

protected:
  template <typename> friend class awaitable_frame_base;

  // Repeatedly resume the top stack frame until the stack is empty or until it
  // has been transferred to another resumable_thread object.
  void pump()
  {
    do top_of_stack_->resume(); while (top_of_stack_);
    if (bottom_of_stack_.valid())
    {
      awaitable<void, Executor> a(std::move(bottom_of_stack_));
      a.frame_->rethrow_exception();
    }
  }

  awaitable<void, Executor> bottom_of_stack_;
  awaitable_frame_base<Executor>* top_of_stack_;
  executor_type executor_;
};

awaitable

现在定位到了awaitable_handler_base::awaitable_type别名就是awaitable。看下实现:

/// The return type of a coroutine or asynchronous operation.
template <typename T, typename Executor = any_io_executor>
class awaitable
{
public:
  /// The type of the awaited value.
  typedef T value_type;

  /// The executor type that will be used for the coroutine.
  typedef Executor executor_type;

  /// Default constructor.
  constexpr awaitable() noexcept
    : frame_(nullptr)
  {
  }

  /// Move constructor.
  awaitable(awaitable&& other) noexcept
    : frame_(std::exchange(other.frame_, nullptr))
  {
  }

  /// Destructor
  ~awaitable()
  {
    if (frame_)
      frame_->destroy();
  }

  /// Checks if the awaitable refers to a future result.
  bool valid() const noexcept
  {
    return !!frame_;
  }

#if !defined(GENERATING_DOCUMENTATION)

  // Support for co_await keyword.
  bool await_ready() const noexcept
  {
    return false;
  }

  // Support for co_await keyword.
  // 不管是暂停还是恢复,都关联到awaitable_frame
  template <class U>
  void await_suspend(
      detail::coroutine_handle<detail::awaitable_frame<U, Executor>> h)
  {
    frame_->push_frame(&h.promise());
  }

  // Support for co_await keyword.
  T await_resume()
  {
    return awaitable(static_cast<awaitable&&>(*this)).frame_->get();
  }

#endif // !defined(GENERATING_DOCUMENTATION)

private:
  template <typename> friend class detail::awaitable_thread;
  template <typename, typename> friend class detail::awaitable_frame;

  // Not copy constructible or copy assignable.
  awaitable(const awaitable&) = delete;
  awaitable& operator=(const awaitable&) = delete;

  // Construct the awaitable from a coroutine's frame object.
  // 构造一个awaitable必须要传递awaitable_frame
  explicit awaitable(detail::awaitable_frame<T, Executor>* a)
    : frame_(a)
  {
  }

  detail::awaitable_frame<T, Executor>* frame_;
};

可以看出,awaitable也实现了awaiter接口:

  • await_ready().
  • await_suspend().
  • await_resume().

它的实现没有复杂的设计:

  • 构造可能传入等待帧awaitable_frame
  • 在暂停流程中,需要压帧操作。
  • 而在恢复流程中,则从帧获取结果。

awaitable_frame

显然协程的具体实现放到等待帧上,还没有梳理上面遗留的数据结构,现在又多了两个问题:这个帧长什么样子的呢?又是从哪里得到帧来构造awaitable的呢?

promise_type

首先需要知道一个awaitable_frameawaitable的关联:前者是后者的promise_type。这个关系可以在萃取中得知:

namespace std {

template <typename T, typename Executor, typename... Args>
struct coroutine_traits<asio::awaitable<T, Executor>, Args...>
{
  typedef asio::detail::awaitable_frame<T, Executor> promise_type;
};

} // namespace std

接着看实现。awaitable_frame具有多个特化,依照目前流程挑选awaitable_frame<void>

// awaitable_frame<T/void>派生于awaitable_frame_base
// 与T/void相关的事务放到awaitable_frame类实现
// 其余promise_type细节放到awaitable_frame_base类实现
template <typename Executor>
class awaitable_frame<void, Executor>
  : public awaitable_frame_base<Executor>
{
public:
  awaitable<void, Executor> get_return_object()
  {
    this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
    // 问题一的答案:promise_type传递自身给awaitable
    return awaitable<void, Executor>(this);
  };

  // 如果是awaitable_frame<T, Executor>特化
  // 那就改为实现return_value(T)
  void return_void()
  {
  }

  void get()
  {
    this->caller_ = nullptr;
    this->rethrow_exception();
  }
};

类设计就是简单的继承关系,分离T和promise_type的职责。

下面按照不同的功能对awaitable_frame_base进行代码划分。

operator new

先是内存管理的部分:

// 重载了operator new和operator delete来定制协程帧的动态内存分配
#if !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
  void* operator new(std::size_t size)
  {
    // thread_info_base的实现粗略看过,大概是优先使用线程本地的内存资源
    return asio::detail::thread_info_base::allocate(
        asio::detail::thread_info_base::awaitable_frame_tag(),
        asio::detail::thread_context::thread_call_stack::top(),
        size);
  }

  void operator delete(void* pointer, std::size_t size)
  {
    asio::detail::thread_info_base::deallocate(
        asio::detail::thread_info_base::awaitable_frame_tag(),
        asio::detail::thread_context::thread_call_stack::top(),
        pointer, size);
  }
#endif // !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)

这部分使用了Asio自己的内存池。

promise operations

然后是作为一个承诺类型应该有的基本操作:

  // The frame starts in a suspended state until the awaitable_thread object
  // pumps the stack.
  // 默认是暂停状态,需要pump()操作才能唤醒
  auto initial_suspend() noexcept
  {
    return suspend_always();
  }

  // On final suspension the frame is popped from the top of the stack.
  // 最后则弹出自身的帧
  auto final_suspend() noexcept
  {
    struct result
    {
      // 在下方返回时传递this构造this_成员
      awaitable_frame_base* this_;

      bool await_ready() const noexcept
      {
        return false;
      }

      void await_suspend(coroutine_handle<void>) noexcept
      {
        // 出帧操作
        this->this_->pop_frame();
      }

      void await_resume() const noexcept
      {
      }
    };

    return result{this};
  }

  void set_except(std::exception_ptr e) noexcept
  {
    pending_exception_ = e;
  }

  void set_error(const asio::error_code& ec)
  {
    this->set_except(std::make_exception_ptr(asio::system_error(ec)));
  }

  void unhandled_exception()
  {
    set_except(std::current_exception());
  }

  void rethrow_exception()
  {
    if (pending_exception_)
    {
      std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
      std::rethrow_exception(ex);
    }
  }

await_transform

接下来这部分比较重要,用于执行可等待体的转换操作:


  // 众多的await_transform
  // - 对于awaitable,原样返回
  // - 对于executor,返回关联executor类型(好神奇的用法)
  // - 对于函数f,这就稍微复杂了,看下面注释

  template <typename T>
  auto await_transform(awaitable<T, Executor> a) const
  {
    return a;
  }

  // This await transformation obtains the associated executor of the thread of
  // execution.
  auto await_transform(this_coro::executor_t) noexcept
  {
    struct result
    {
      awaitable_frame_base* this_;

      bool await_ready() const noexcept
      {
        return true;
      }

      void await_suspend(coroutine_handle<void>) noexcept
      {
      }

      auto await_resume() const noexcept
      {
        return this_->attached_thread_->get_executor();
      }
    };

    return result{this};
  }

  // This await transformation is used to run an async operation's initiation
  // function object after the coroutine has been suspended. This ensures that
  // immediate resumption of the coroutine in another thread does not cause a
  // race condition.
  // 注释提到,这就是一个异步函数实现转换的重载版本
  // 在这里实现跨线程并不会引起竞争
  template <typename Function>
  auto await_transform(Function f,
      // 既然注释都说了在这里转换,我就不看这一坨萃取了……
      typename enable_if<
        is_convertible<
          typename result_of<Function(awaitable_frame_base*)>::type,
          awaitable_thread<Executor>*
        >::value
      >::type* = 0)
  {
    struct result
    {
      Function function_;
      awaitable_frame_base* this_;

      bool await_ready() const noexcept
      {
        return false;
      }

      void await_suspend(coroutine_handle<void>) noexcept
      {
        // 异步函数与模拟栈实现关联
        // 并且在这里开始执行
        // 划重点,总算动起来了
        function_(this_);
      }

      void await_resume() const noexcept
      {
      }
    };

    return result{std::move(f), this};
  }

帧管理

后续是frame和thread的管理:

  void attach_thread(awaitable_thread<Executor>* handler) noexcept
  {
    attached_thread_ = handler;
  }

  awaitable_thread<Executor>* detach_thread() noexcept
  {
    return std::exchange(attached_thread_, nullptr);
  }

  // 从作者注释(下面)画的关系图,可以得知模拟的等待帧是在一个thread内管理的
  // 一个“thread”内的帧都使用同一executor,并且循环出栈就能跑起来
  void push_frame(awaitable_frame_base<Executor>* caller) noexcept
  {
    caller_ = caller;
    attached_thread_ = caller_->attached_thread_;
    attached_thread_->top_of_stack_ = this;
    caller_->attached_thread_ = nullptr;
  }

  void pop_frame() noexcept
  {
    if (caller_)
      caller_->attached_thread_ = attached_thread_;
    attached_thread_->top_of_stack_ = caller_;
    attached_thread_ = nullptr;
    caller_ = nullptr;
  }

  void resume()
  {
    coro_.resume();
  }

  void destroy()
  {
    coro_.destroy();
  }

protected:
  coroutine_handle<void> coro_ = nullptr;
  awaitable_thread<Executor>* attached_thread_ = nullptr;
  awaitable_frame_base<Executor>* caller_ = nullptr;
  std::exception_ptr pending_exception_ = nullptr;

整体设计

在类的注释中,Asio作者给出一张图来说明:

// An awaitable_thread represents a thread-of-execution that is composed of one
// or more "stack frames", with each frame represented by an awaitable_frame.
// All execution occurs in the context of the awaitable_thread's executor. An
// awaitable_thread continues to "pump" the stack frames by repeatedly resuming
// the top stack frame until the stack is empty, or until ownership of the
// stack is transferred to another awaitable_thread object.
//
//                +------------------------------------+
//                | top_of_stack_                      |
//                |                                    V
// +--------------+---+                            +-----------------+
// |                  |                            |                 |
// | awaitable_thread |<---------------------------+ awaitable_frame |
// |                  |           attached_thread_ |                 |
// +--------------+---+           (Set only when   +---+-------------+
//                |               frames are being     |
//                |               actively pumped      | caller_
//                |               by a thread, and     |
//                |               then only for        V
//                |               the top frame.)  +-----------------+
//                |                                |                 |
//                |                                | awaitable_frame |
//                |                                |                 |
//                |                                +---+-------------+
//                |                                    |
//                |                                    | caller_
//                |                                    :
//                |                                    :
//                |                                    |
//                |                                    V
//                |                                +-----------------+
//                | bottom_of_stack_               |                 |
//                +------------------------------->| awaitable_frame |
//                                                 |                 |
//                                                 +-----------------+

这里描述了awaitable_threadawaitable_frame的关联。我的理解是作者模拟了线程和栈。awaitable_thread“线程”管理一组协程,而协程都以“栈”的形式压到同一“线程”内。可以想象得到,用户调用co_await时则会入栈,而整个“线程”的运作机制就是不断出栈,当然具体运行的上下文仍是executor对应的context,这就与Asio原有的机制挂上钩了。

NOTE: 我认为这里需要“线程”的概念是因为asio::co_spawn()模拟了fork()/execve()原语。

回到await_frame的实现上,作为一个承诺类型,可以看到它的initial_suspend()是暂停的,需要由管理线程唤醒。final_suspend()则维护栈结构的出帧操作。

另外一个关键点是await_transform转换。有三种重载:

  • 如果是awaitable,原地返回。
  • 如果executor_t,将(不暂停并)直接恢复得到关联的executor
  • 如果是异步函数f,则在暂停后执行f(this_await_frame),传递帧指针后执行异步函数

co_spawn()

说到执行,前面提到的异步函数本身是协程(结论见async_result),而且do_stuff()也是协程,其流程都是“懒惰”而尚未实际执行的。我们需要一个kick off的契机,但目前还缺少executor信息。一般来说,上下文运行于asio::io_context.run()对应的线程,而要让协程跑起来还需要asio::co_spawn(executor, awaitable{}, completion_token),可以说这是一个协程开始孵化的地方。现在从头来梳理一遍调用过程。按照文中给的例子,这里executor传入io_context对象(实际是context而非executor,没关系,Asio帮你搞定),awaitable{}传入do_stuff协程对应的可等待体do_stuff(socket)completion_token设为空置的asio::detached表示不关心完成结果。

现在来看co_spawn()实现。

// 这里是实现部分,最后一个参数是默认`=0`(接口分离在另一个文件了,所以函数签名信息不全)
template <typename Executor, typename AwaitableExecutor,
    ASIO_COMPLETION_TOKEN_FOR(
      void(std::exception_ptr)) CompletionToken>
inline ASIO_INITFN_AUTO_RESULT_TYPE(
    CompletionToken, void(std::exception_ptr))
co_spawn(const Executor& ex,
    awaitable<void, AwaitableExecutor> a, CompletionToken&& token,
    typename enable_if<
      (is_executor<Executor>::value || execution::is_executor<Executor>::value)
        && is_convertible<Executor, AwaitableExecutor>::value
    >::type*)
{
  return async_initiate<CompletionToken, void(std::exception_ptr)>(
      detail::initiate_co_spawn<AwaitableExecutor>(AwaitableExecutor(ex)),
      token, detail::awaitable_as_function<
        void, AwaitableExecutor>(std::move(a)));
}

co_spawn()的实现再次用到async_initiate()。除了传入initiate_co_spawn对象以外,还使用awaitable_as_functionawaitable封装为函数。

async_result

前面提到,async_initiate()具有2个重载版本,使用哪个版本取决于async_result是否有initiate()函数。我们需要找tokenasio::detached_t的特化版本。

template <typename Signature>
struct async_result<detached_t, Signature>
{
  // detached_handler内部全部是空实现,表示不关心处理结果,仅作为一个handler适配器
  typedef asio::detail::detached_handler completion_handler_type;

  // 返回void
  typedef void return_type;

  explicit async_result(completion_handler_type&)
  {
  }

  void get()
  {
  }

#if defined(ASIO_HAS_VARIADIC_TEMPLATES)

  template <typename Initiation, typename RawCompletionToken, typename... Args>
  static return_type initiate(
      ASIO_MOVE_ARG(Initiation) initiation, // 传入initiate_co_spawn{ex}
      ASIO_MOVE_ARG(RawCompletionToken), // 传入asio::detached
      ASIO_MOVE_ARG(Args)... args) // 传入awaitable_as_function{a}
  {
    ASIO_MOVE_CAST(Initiation)(initiation)(
        detail::detached_handler(detached_t()),
        ASIO_MOVE_CAST(Args)(args)...);
  }

#else
  // ...略
#endif
};

由于具有initate()函数,因此依然是版本1。将执行async_result::initate(),展开为initiate_co_spawn{ex}.operator()(detached, awaitable_as_function{a})

注意和use_awaitable版本的异步函数对应的initiate()有所不同,它并非协程:跳回上面看看

awaitable_as_function

顺便补充一下awaitable_as_function,是一个将可等待体伪造成仿函数的类。

template <typename T, typename Executor>
class awaitable_as_function
{
public:
  explicit awaitable_as_function(awaitable<T, Executor>&& a)
    : awaitable_(std::move(a))
  {
  }

  // 执行()就能还原回awaitable
  awaitable<T, Executor> operator()()
  {
    return std::move(awaitable_);
  }

private:
  awaitable<T, Executor> awaitable_;
};

initiate_co_spawn

template <typename Executor>
class initiate_co_spawn
{
public:
  typedef Executor executor_type;

  template <typename OtherExecutor>
  explicit initiate_co_spawn(const OtherExecutor& ex)
    : ex_(ex)
  {
  }

  executor_type get_executor() const ASIO_NOEXCEPT
  {
    return ex_;
  }

  template <typename Handler, typename F>
  void operator()(Handler&& handler, F&& f) const
  {
    typedef typename result_of<F()>::type awaitable_type;

    auto a = (co_spawn_entry_point)(static_cast<awaitable_type*>(nullptr),
        ex_, std::forward<F>(f), std::forward<Handler>(handler));
    awaitable_handler<executor_type, void>(std::move(a), ex_).launch();
  }

private:
  Executor ex_;
};

如上述initiate_co_spawn代码,operator()(detached, awaitable_as_function{a2})展开后:

  • 先执行a = co_spawn_entry_point(nullptr, awaitable_as_function{a2}, detached)得到awaitable a
  • 再执行awaitable_handler(a, ex).launch()

NOTE: 这里a2指的是co_spawn()传递的用户协程生成的可等待体。

co_spawn_entry_point()

template <typename Executor, typename F, typename Handler>
awaitable<void, Executor> co_spawn_entry_point(
    awaitable<void, Executor>*, Executor ex, F f, Handler handler)
{
  // 构造work guard
  // make_co_spawn_work_guard()这部分代码和unified executors关联性较大
  // 为ex尝试添加execution::outstanding_work_t::tracked_t的property
  // 表示将来会有任务提交,避免此时executor(比如io context)关闭
  auto spawn_work = make_co_spawn_work_guard(ex);
  auto handler_work = make_co_spawn_work_guard(
      asio::get_associated_executor(handler, ex));

  // asio::post(ex, token)是execution接口
  // 差不多就是添加一些property然后execute
  // 总之,用目前的示例就是转发到io_context.execute(ex, use_awaitable)
  //
  // 那么问题来了,这里调用asio::post()将返回什么类型?co_await ???
  // 我简单剧透,还是awaitable
  (void) co_await (post)(spawn_work.get_executor(),
      use_awaitable_t<Executor>{__FILE__, __LINE__, "co_spawn_entry_point"});

  std::exception_ptr e = nullptr;
  try
  {
    // 实际执行点
    co_await f();
  }
  catch (...)
  {
    e = std::current_exception();
  }

  (dispatch)(handler_work.get_executor(),
      [handler = std::move(handler), e]() mutable
      {
        handler(e);
      });
}

co_spawn_entry_point()是运行时第一个调用到的协程,它的实现有点复杂:

  • 先是co_await post(ex, use_awaitable)
  • 然后是co_await f()
  • 最后是work guard handler收尾。

好吧,虽然我们在聊co_spawn(),但严格点说do_stuff()函数才是第一个调用到的协程。

由于initial_suspend()的存在,co_spawn_entry_point()并没有执行,而是直接暂停而返回给上层initiate_co_spawn::operator()

launch()

回到上层,接下来就执行awaitable_handler(a, ex).launch()

launch()前,可以先回顾一下awaitable_handler的构造。它是经由awaitable_handler -> awaitable_handler_base -> awaitable_thread的委托构造而得到的对象。此时a作为bottom_of_stack_成员而被awaitable_handler持有;top_of_stack_成员指向a.frame_

这里有个细节是awaitable.frame_是怎么得到的问题,这玩意默认构造可是空指针啊。解答见promise_type小节。作为承诺类型的awaitable_frame在协程调用前即被隐式operator new分配;当你在一个协程返回awaitable前,awaitable_frame调用get_return_object()保证传递thisawaitable。而awaitable的实际对象a是一个已暂停(协程返回)过的可等待体,因此必然有等待帧。

  // Launch a new thread of execution.
  // awaitable_handler执行
  void launch()
  {
    top_of_stack_->attach_thread(this);
    pump();
  }

  // awaitable_thread<Executor>* top_of_stack_执行
  void attach_thread(awaitable_thread<Executor>* handler) noexcept
  {
    attached_thread_ = handler;
  }

  // Repeatedly resume the top stack frame until the stack is empty or until it
  // has been transferred to another resumable_thread object.
  // awaitable_handler执行
  void pump()
  {
    // 唤醒
    do top_of_stack_->resume(); while (top_of_stack_);
    if (bottom_of_stack_.valid())
    {
      awaitable<void, Executor> a(std::move(bottom_of_stack_));
      a.frame_->rethrow_exception();
    }
  }

显然,这是整体设计里提到的出栈处理。在上一节,co_spawn_entry_point()因被暂停而压帧(push_frame())。现在位于top_of_stack_,因此通过resume()唤醒,恢复执行。

恢复后的co_spawn_entry_point()先构造了spawn_workhandler_work,然后实际执行co_await post(use_awaitable)表达式。

post()

co_await post(ex, use_awaitable)先执行了post()操作,然后对其返回值进行co_await操作。

// 这是post接口
template <typename Executor,
    ASIO_COMPLETION_TOKEN_FOR(void()) CompletionToken>
ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void()) post(
    const Executor& ex, ASIO_MOVE_ARG(CompletionToken) token,
    typename enable_if<
      execution::is_executor<Executor>::value || is_executor<Executor>::value
    >::type*)
{
  // 再一次,async_initiate
  // token传入use_awaitable
  // 从前面章节的async_result<use_awaitable_t>可知,这里返回一个awaitable
  return async_initiate<CompletionToken, void()>(
      detail::initiate_post_with_executor<Executor>(ex), token);
}

再一次看到了async_initiate,而token则传入use_awaitable,从前面的async_result章节可知,将返回一个awaitable。因此,asio::post(use_awaitable)重载版本的操作也是协程化的。

我们解决了上面留下来的问题,co_await (post)(..., use_awaitable)仍是co_await awaitable{...}。万幸没有更加复杂的转换……

虽然post()作为协程是存在暂停现象,但是前面launch()函数的pump()操作又会将它唤醒,继续往下看展开后的操作。

暂停是因为async_result<use_awaitable_t>::initiate()是协程。

kick off

当尝试调用async_result<use_awaitable_t>::initiate(),需要注意这一块会关联到前面async_result的行为。为了方便再贴一次代码:

  template <typename Initiation, typename... InitArgs>
  static return_type initiate(Initiation initiation,
      use_awaitable_t<Executor> u, InitArgs... args)
  {
    (void)u;

    co_await [&](auto* frame)
      {
        ASIO_HANDLER_LOCATION((u.file_name_, u.line_, u.function_name_));
        handler_type handler(frame->detach_thread());
        // 实际是在这里处理初始化函数
        std::move(initiation)(std::move(handler), std::move(args)...);
        return static_cast<handler_type*>(nullptr);
      };

    for (;;) {} // Never reached.
  }

async_result::initiate()函数是一个协程,其内部操作有co_await lambda的行为。根据承诺对象的转换操作,这里对应await_transform(f)的重载:

  • 先暂停(await_ready()返回false)。
  • 暂停后执行await_suspend()操作,传递this_(等待帧)到lambda(frame = this_)
  • 构造awaitable_handler类型的handler
  • 执行initiate_post_with_executor{}.operator()(handler)

initiate_post_with_executor

现在看下最后一步,即initiate_post_with_executor{}.operator()(handler)的流程:

template <typename Executor>
class initiate_post_with_executor
{
public:
  typedef Executor executor_type;

  explicit initiate_post_with_executor(const Executor& ex)
    : ex_(ex)
  {
  }

  executor_type get_executor() const ASIO_NOEXCEPT
  {
    return ex_;
  }

  // 传入awaitable_handler
  template <typename CompletionHandler>
  void operator()(ASIO_MOVE_ARG(CompletionHandler) handler,
      typename enable_if<
        execution::is_executor<
          typename conditional<true, executor_type, CompletionHandler>::type
        >::value
        &&
        // 没心思看萃取,重载版本太多,看这个实现吧,大同小异
        detail::is_work_dispatcher_required<
          typename decay<CompletionHandler>::type,
          Executor
        >::value
      >::type* = 0) const
  {
    typedef typename decay<CompletionHandler>::type handler_t;

    typedef typename associated_executor<
      handler_t, Executor>::type handler_ex_t;
    handler_ex_t handler_ex((get_associated_executor)(handler, ex_));

    typename associated_allocator<handler_t>::type alloc(
        (get_associated_allocator)(handler));

    // 简单来说,就是ex.execute(handler);
    // 这里使得上下文切换到ex(比如io_context)的上下文去执行handler()
    // 因此需要看实际的awaitable_handler的operator()要做什么
    execution::execute(
        asio::prefer(
          asio::require(ex_, execution::blocking.never),
          execution::relationship.fork,
          execution::allocator(alloc)),
        detail::work_dispatcher<handler_t, handler_ex_t>(
          ASIO_MOVE_CAST(CompletionHandler)(handler), handler_ex));
  }

  // ...省略大量operator()重载

private:
  Executor ex_;
};

现在可以得知,post(io_context)操作将awaitable_handler投递到io_context上下文,投递完成后co_await暂停,实现了上下文的切换

那么接下来呢?接下来co_spawn()已经结束流程了,因为返回launch()上层后pump()也不会再次唤醒任何协程。要问为什么还得看回co_await lambda倒数第二步。

context switch

co_await lambda过程需要注意handler的构造函数以及detach_thread行为。(跳回awaitable_handler小节

  // 为了方便再贴一次代码
  // Transfer ownership from another awaitable_thread.
  explicit awaitable_handler_base(awaitable_thread<Executor>* h)
    : awaitable_thread<Executor>(std::move(*h))
  {
  }

这里handler的构造使用了std::move(),引起了“线程”所有权的转移。被偷家的“线程”pump()过程所用到的“栈”已经置空。

  void pump()
  {
    // 唤醒?不存在的
    do top_of_stack_->resume(); while (top_of_stack_);
    if (bottom_of_stack_.valid())
    {
      awaitable<void, Executor> a(std::move(bottom_of_stack_));
      a.frame_->rethrow_exception();
    }
  }

当执行完post()后,co_await post()将使得协程co_spawn_entry_point()暂停。此后本地上下文再也不会恢复该协程。

  // 其中某一特化awaitable_handler::operator()如下
  // 注意从前面的caller来看,这是一个异步的过程
  // 因为我们跟踪的流程是本地上下文执行的co_spawn(io_context, a, detached)
  // 而这个是在io_context上下文执行的,现在只是投递一个事件,还没有实际运行
  // 等到io_context.run()才调用到这个handler
  template <typename... Args>
  void operator()(Args&&... args)
  {
    this->frame()->attach_thread(this);
    this->frame()->return_values(std::forward<Args>(args)...);
    this->frame()->pop_frame();
    this->pump();
  }

而上下文将被切换到executor所指定的上下文。比如io_context.run(),使得awaitable_handler{}()被执行,此前暂停的协程可以再次恢复,并且运行所在的上下文已经切换到io_context

再接下来,恢复的co_spawn_entry_point()则执行co_await f(),这里就是对应的do_stuff()用户层接口,真正执行异步函数。

总结

简单的做点总结吧:

  • Asio通过async_result萃取和use_awaitable令牌来完成异步函数的协程化重载。
  • Asio划分协程组为模拟线程(awaitable thread),组内协程的调用关系模拟为栈结构。
  • Asio使用co_spawn()创建模拟线程和投递任务,并通过模拟线程迁移完成上下文切换。
  • Asio很抽象,我还远远跟不上作者的思路。

图文无关

按照惯例,文末随机附有内容无关的图。

cocoa Cocoa is cute!

References

Coroutines – cppreference

C++ Coroutines: Understanding operator co_await – Asymmetric Transfer

C++20 Coroutines Support – asio C++ library

This post is licensed under CC BY 4.0 by the author.
Contents