背景
本文简单补充一下 C++20 协程介绍 当中略过的细节。此前在 awaiter 章节提到等待器存在竞态条件,刚好看到 Raymond Chen 在 C++/WinRT 介绍 中也提到这个问题,还给了几种方案来修复问题,顺过来当笔记了。
Raymond 为此水了四篇文章,我只水一篇不过分吧。
问题
auto resume_foreground(DispatcherQueue const& dispatcher)
{
struct awaitable
{
DispatcherQueue m_dispatcher;
bool m_queued = false;
bool await_ready()
{
return false;
}
bool await_suspend(coroutine_handle<> handle)
{
m_queued = m_dispatcher.TryEnqueue([handle]
{
handle();
});
return m_queued;
}
bool await_resume()
{
return m_queued;
}
};
return awaitable{ dispatcher };
}
这是 Raymond 给的示例,比我之前文章写的好懂就直接 🇰🇷 来说明了。
这份代码当中的 TryEnqueue
指的是将某个任务放入 C++/WinRT 框架中的队列,并且立刻返回布尔值表示是否已入队;如果入队成功,那么调度后会在未来执行该任务。具体到代码中,这个任务就是 handle()
,注意 coroutine_handle::operator()
和 resume()
是一个意思。
Note that the coroutine is fully suspended before entering awaiter.await_suspend(). Its handle can be shared with another thread and resumed before the await_suspend() function returns.
来源:cppreference
问题是什么?如标题所言,潜在竞态条件:await_resume 先行于 await_suspend 完成前而完成。具体的状态转换细节见此前的 awaiter 章节。
因为存在时序:
- 用户调用
co_await resume_foreground(dispatcher)
。 - 等待器执行
await_ready()
到await_supend()
。 await_supend()
当中,TryEnqueue()
入队成功。- 此时并行存在两条异步链路:
链路一 | 链路二 |
---|---|
await_suspend() 剩余部分
| handle() :await_resume() 且恢复控制权
|
假设链路二完全先行于链路一完成。await_resume 完成后意味着 struct awaitable
已析构,然后用户还企图访问其成员 m_queued
,所以喜提 use-after-free 错误。
下面讨论解决方案。
方案一
struct slim_event
{
void signal() { /* ... */ }
void wait() { /* ... */ }
};
观察到链路一和链路二不具有同步关系,那让它们强行同步:必须先完整执行 await_suspend()
再做任务回调。因此一个直接做法就是提供一个轻量 signal/wait 同步组件,插在存在竞态条件的控制流当中作为栅栏使用。
但是问题没那么简单,你的同步组件生命周期怎么办。可以想到在 await_suspend()
和任务之间使用 std::shared_ptr
,但这会引入多余的动态内存分配。
struct tracked_slim_event
{
tracked_slim_event(slim_event*& p)
: tracker(p) { tracker = &value; }
tracked_slim_event(tracked_slim_event&& other)
: tracker(other.tracker) { tracker = &value; }
slim_event*& tracker;
slim_event value;
};
bool await_suspend(coroutine_handle<> handle)
{
slim_event* finder;
bool result = m_dispatcher.TryEnqueue(
[handle, tracker = tracked_slim_event(finder)] mutable
{
tracker.value.wait();
handle();
});
m_queued = result;
// 这里 finder 肯定是非空的
finder->value.signal();
return result;
}
Raymond 给了一个 tracked_
封装技巧如上所示,比较怪,但是免去了动态内存分配。总之,该示例可以确保程序的正确性,await_suspend 必然先行于任务执行前完成,也就是没有竞态条件了。
方案二
auto resume_foreground(DispatcherQueue const& dispatcher)
{
struct awaitable
{
DispatcherQueue m_dispatcher;
bool m_queued = false;
// 👇
slim_event ready;
bool await_suspend(coroutine_handle<> handle)
{
bool result = m_dispatcher.TryEnqueue([this, handle]
{
ready.wait();
handle();
});
m_queued = result;
ready.signal();
return result;
}
// 其他部分相同,略。
};
return awaitable{ dispatcher };
}
其实可以直接将同步组件作为数据成员,免去上述方案的反直觉代码。
当然,如果你的等待器存在短路设计(await_ready 直通 await_resume),那就不合适了,因为这会额外引入 slim_event
的构造开销,不管是否需要 await_suspend。也需要注意,只有类似同步组件的设计才适合放入到数据成员中,毕竟协程帧和等待器的生命周期是有差异的,只是同步组件在这里恰好能通过同步等待的方式保证安全。
但是,这两套方案是无论如何也免去不了 wait/signal 的 memory order 同步开销,因为在 await_suspend 当中不管入队成功还是失败都要至少走一遍 atomic_thread_fence;除此以外还有 WakeByAddressAll 和 WaitOnAddress 的开销。
方案三
bool await_suspend(coroutine_handle<> handle)
{
return
m_dispatcher.TryEnqueue([this, handle]
{
// 👇
m_queued = true;
handle();
});
}
方案三实际非常巧妙,(C++20 协程的)竞态条件的处理并不需要额外引入同步机制。有两点事实:一是任务内部有天然的先行关系;二是不管是用户还是等待器,此时都只会在 await_resume 实际需要数据成员。
所以只需:
- 在 coroutine_handle::operator() 前更新依赖的数据成员,且
- 在 await_suspend 产生新的异步链路后不再使用数据成员
确保上述两点后,竞态条件就不会存在。
这里还有一个有意思的点。m_queued 的描述并不符合事实:在 TryEnqueue 成功后,该变量依然是 false。但是这并不影响程序的正确性。
总结
- C++20 协程要注意「await_resume 先行于 await_suspend 完成前而完成」的竞态条件。
- 竞态条件可以通过同步的手段去解决。
- C++20 协程的竞态条件不必通过同步的手段去解决。