背景

本文简单补充一下 C++20 协程介绍 当中略过的细节。此前在 awaiter 章节提到等待器存在竞态条件,刚好看到 Raymond Chen 在 C++/WinRT 介绍 中也提到这个问题,还给了几种方案来修复问题,顺过来当笔记了。

Raymond 为此水了四篇文章,我只水一篇不过分吧。

问题

auto resume_foreground(DispatcherQueue const& dispatcher)
{
  struct awaitable
  {
    DispatcherQueue m_dispatcher;
    bool m_queued = false;

    bool await_ready()
    {
      return false;
    }

    bool await_suspend(coroutine_handle<> handle)
    {
      m_queued = m_dispatcher.TryEnqueue([handle]
        {
          handle();
        });
      return m_queued;
    }

    bool await_resume()
    {
      return m_queued;
    }
  };
  return awaitable{ dispatcher };
}

这是 Raymond 给的示例,比我之前文章写的好懂就直接 🇰🇷 来说明了。

这份代码当中的 TryEnqueue 指的是将某个任务放入 C++/WinRT 框架中的队列,并且立刻返回布尔值表示是否已入队;如果入队成功,那么调度后会在未来执行该任务。具体到代码中,这个任务就是 handle(),注意 coroutine_handle::operator()resume() 是一个意思。

Note that the coroutine is fully suspended before entering awaiter.await_suspend(). Its handle can be shared with another thread and resumed before the await_suspend() function returns.
来源:cppreference

问题是什么?如标题所言,潜在竞态条件:await_resume 先行于 await_suspend 完成前而完成。具体的状态转换细节见此前的 awaiter 章节。

因为存在时序:

  1. 用户调用 co_await resume_foreground(dispatcher)
  2. 等待器执行 await_ready()await_supend()
  3. await_supend() 当中,TryEnqueue() 入队成功。
  4. 此时并行存在两条异步链路:
链路一 链路二
await_suspend() 剩余部分 handle()await_resume() 且恢复控制权

假设链路二完全先行于链路一完成。await_resume 完成后意味着 struct awaitable 已析构,然后用户还企图访问其成员 m_queued,所以喜提 use-after-free 错误。

下面讨论解决方案。

方案一

struct slim_event
{
  void signal() { /* ... */ }
  void wait() { /* ... */ }
};

观察到链路一和链路二不具有同步关系,那让它们强行同步:必须先完整执行 await_suspend() 再做任务回调。因此一个直接做法就是提供一个轻量 signal/wait 同步组件,插在存在竞态条件的控制流当中作为栅栏使用。

但是问题没那么简单,你的同步组件生命周期怎么办。可以想到在 await_suspend() 和任务之间使用 std::shared_ptr,但这会引入多余的动态内存分配。

struct tracked_slim_event
{
 tracked_slim_event(slim_event*& p)
    : tracker(p) { tracker = &value; }
 tracked_slim_event(tracked_slim_event&& other)
    : tracker(other.tracker) { tracker = &value; }

 slim_event*& tracker;
 slim_event value;
};

bool await_suspend(coroutine_handle<> handle)
{
  slim_event* finder;
  bool result = m_dispatcher.TryEnqueue(
    [handle, tracker = tracked_slim_event(finder)] mutable
    {
      tracker.value.wait();
      handle();
    });

  m_queued = result;

  // 这里 finder 肯定是非空的
  finder->value.signal();
  return result;
}

Raymond 给了一个 tracked_ 封装技巧如上所示,比较怪,但是免去了动态内存分配。总之,该示例可以确保程序的正确性,await_suspend 必然先行于任务执行前完成,也就是没有竞态条件了。

方案二

auto resume_foreground(DispatcherQueue const& dispatcher)
{
  struct awaitable
  {
    DispatcherQueue m_dispatcher;
    bool m_queued = false;
    // 👇
    slim_event ready;

    bool await_suspend(coroutine_handle<> handle)
    {
      bool result = m_dispatcher.TryEnqueue([this, handle]
        {
          ready.wait();
          handle();
        });
      m_queued = result;
      ready.signal();
      return result;
    }

    // 其他部分相同,略。

  };
  return awaitable{ dispatcher };
}

其实可以直接将同步组件作为数据成员,免去上述方案的反直觉代码。

当然,如果你的等待器存在短路设计(await_ready 直通 await_resume),那就不合适了,因为这会额外引入 slim_event 的构造开销,不管是否需要 await_suspend。也需要注意,只有类似同步组件的设计才适合放入到数据成员中,毕竟协程帧和等待器的生命周期是有差异的,只是同步组件在这里恰好能通过同步等待的方式保证安全。

此前的同步组件实现
// fence 免不了
struct slim_event
{
  slim_event() = default;

  // Not copyable
  slim_event(slim_event const&) = delete;
  void operator=(slim_event const&) = delete;

  bool signaled = false;

  void signal()
  {
    signaled = true; // 作者应该写漏了这个赋值
    std::atomic_thread_fence(std::memory_order_release);
    WakeByAddressAll(&signaled);
  }

  void wait()
  {
    // Wait for "signaled" to be "not false" (i.e., true)
    bool False = false;
    while (!signaled) {
      WaitOnAddress(&signaled, &False, sizeof(False), INFINITE);
    }
    std::atomic_thread_fence(std::memory_order_acquire);
  }
};

但是,这两套方案是无论如何也免去不了 wait/signal 的 memory order 同步开销,因为在 await_suspend 当中不管入队成功还是失败都要至少走一遍 atomic_thread_fence;除此以外还有 WakeByAddressAll 和 WaitOnAddress 的开销。

方案三

bool await_suspend(coroutine_handle<> handle)
{
  return
    m_dispatcher.TryEnqueue([this, handle]
    {
      // 👇
      m_queued = true;
      handle();
    });
}

方案三实际非常巧妙,(C++20 协程的)竞态条件的处理并不需要额外引入同步机制。有两点事实:一是任务内部有天然的先行关系;二是不管是用户还是等待器,此时都只会在 await_resume 实际需要数据成员。

所以只需:

  • 在 coroutine_handle::operator() 前更新依赖的数据成员,且
  • 在 await_suspend 产生新的异步链路后不再使用数据成员

确保上述两点后,竞态条件就不会存在。

这里还有一个有意思的点。m_queued 的描述并不符合事实:在 TryEnqueue 成功后,该变量依然是 false。但是这并不影响程序的正确性。

总结

  • C++20 协程要注意「await_resume 先行于 await_suspend 完成前而完成」的竞态条件。
  • 竞态条件可以通过同步的手段去解决。
  • C++20 协程的竞态条件不必通过同步的手段去解决。