前言
这篇文章简单描述 Asio 怎样实现一个无栈协程。
下一篇文章:从 C++20 协程,到 Asio 的协程适配
switch-case 基础
首先,switch-case 语法应该是 C 语言中的基础;其次,goto 语法也是 C 语言中的基础。goto
后面衔接的是 label
,而 switch
语法中的 case
也是一种 label
。
那意味着什么?意味着可以飞线。
使用飞线的 ThinkPad T60 主板
这段代码你可以观察不同的变量输入会产生什么结果:
#include <iostream>
using std::cin;
using std::cout;
using std::endl;
int main() {
int a;
cin >> a;
switch (a) case 0:
for(; a < 2;) {
cout << "hello" << endl;
case 2:
return 0;
case 1:
cout << "world" << endl;
}
}
// >> 0
// hello
// [return]
//
// >> 1
// world
// hello
// [return]
//
// >> 2
// [return]
总之,这里标号(case label)越过了代码块的限制,也不影响原有实现的执行:先是 case
常量表达式的匹配,然后直落(fall through)。
达夫设备
这种反常的特性有一个著名的应用就是达夫设备(Duff’s device),主要用于循环展开以优化性能。
虽然跟后续的话题关联不大,但你可以更加深刻的理解飞线能做到什么程度。
#include <iostream>
// 使用 case 来完成 duff's device
void private_memcpy(char *to, const char *from, size_t count) {
size_t n = (count + 7) / 8;
#define COPY *to = *from;
#define ADVANCE to++, from++;
switch (count % 8) {
case 0: do { COPY ADVANCE
case 7: COPY ADVANCE
case 6: COPY ADVANCE
case 5: COPY ADVANCE
case 4: COPY ADVANCE
case 3: COPY ADVANCE
case 2: COPY ADVANCE
case 1: COPY ADVANCE
} while (--n > 0);
}
#undef COPY
#undef ADVANCE
}
int main() {
const char from[] = "jintianxiaomidaobilema";
char to[sizeof from];
private_memcpy(to, from, sizeof from);
std::cout << to << std::endl;
return 0;
}
协程基础
广告:实现一个简单的协程 – Caturra’s blog。
我在之前写过一篇实现协程轮子(有栈协程)的文章,里面有一些关于协程的基本概念,各位客官对协程不太熟悉的话可以看看。
无栈协程
前面提到的有栈协程是从调用栈的角度去考虑的。
大概意思是:传统意义上的函数调用是怎样的,我协程也尝试去模仿、改造、封装。
但是无栈协程直接是抛弃 caller-callee 这些概念,从状态转移的角度去入手。
比如一个写法层面蠢到爆的例子:
int fib() {
static int state_machine {0};
switch(state_machine) {
case 0:
state_machine = 1;
return 1;
case 1:
state_machine = 2;
return 1;
case 2:
state_machine = 3;
return 2;
case 3:
state_machine = 4;
return 3;
case 4:
state_machine = 5;
return 5;
case 5:
state_machine = 6;
return 8;
case 6:
state_machine = 7;
return 13;
case 7:
state_machine = -1;
return 21;
default:;
}
return -1;
}
这是一个求斐波那契数列的协程——一个函数,可以拆分其控制流,每次返回后再次调用时,他都可以从保存的上下文中恢复。所以是名副其实的协程(虽然算到 21 后就不能再次重入了)。
我们对这些过程稍加修饰:
#include <iostream>
/// holy shit!
#define CO_BEGIN static int _state = 0; switch(_state) { case 0:
#define CO_YIELD(ret) do {_state = __LINE__; return ret; case __LINE__:;} while(0)
#define CO_END _state = -1; default:;}
int fib() {
CO_BEGIN
CO_YIELD(1);
CO_YIELD(1);
CO_YIELD(2);
CO_YIELD(3);
CO_YIELD(5);
CO_YIELD(8);
CO_YIELD(13);
CO_YIELD(21);
CO_END
return -1;
}
int main() {
for(int ret; (ret = fib()) != -1;) {
std::cout << ret << std::endl;
}
return 0;
}
进一步的,可以继续提供 CO_RETURN
语义,可以直接终止协程并不可再次重入。
#include <iostream>
#include <cassert>
/// holy shit!!
#define CO_VAR static
#define CO_BEGIN static int _state = 0; switch(_state) { case 0:
#define CO_YIELD(ret) do {_state = __LINE__; return ret; case __LINE__:;} while(0)
#define CO_RETURN(ret) do {_state = -1; return ret;} while(0)
#define CO_END _state = -1; default:;}
static constexpr int NONE = 0;
// acts like a pipe
static int g_pipe {NONE};
using std::cout;
using std::endl;
int producer() {
CO_VAR int i;
CO_BEGIN
for(i = NONE + 1; i < NONE + 5; ++i) {
g_pipe = i;
cout << "[producer] generates: " << i << endl;
CO_YIELD(i);
}
CO_END
return g_pipe = EOF;
}
void consumer() {
CO_BEGIN
for(;;) {
if(g_pipe == NONE) {
cout << "[consumer] none, yield." << endl;
CO_YIELD();
cout << "[consumer] wakeups and checks again." << endl;
}
if(g_pipe == EOF) {
cout << "[consumer] end-of-file, return." << endl;
CO_RETURN();
// unreachable
cout << "[wtf] jintianxiaomidaobila" << endl;
}
cout << "[consumer] consumes: " << g_pipe << endl;
g_pipe = 0;
}
CO_END
cout << "cannot consume" << endl;
}
int main() {
while(~g_pipe) {
int debug = producer();
assert(debug == g_pipe);
consumer();
}
cout << "==================" << endl;
for(auto _{3}; _--;) {
consumer();
}
return 0;
}
// [producer] generates: 1
// [consumer] consumes: 1
// [consumer] none, yield.
// [producer] generates: 2
// [consumer] wakeups and checks again.
// [consumer] consumes: 2
// [consumer] none, yield.
// [producer] generates: 3
// [consumer] wakeups and checks again.
// [consumer] consumes: 3
// [consumer] none, yield.
// [producer] generates: 4
// [consumer] wakeups and checks again.
// [consumer] consumes: 4
// [consumer] none, yield.
// [consumer] wakeups and checks again.
// [consumer] end-of-file, return.
// ==================
// cannot consume
// cannot consume
// cannot consume
这个生产者 - 消费者示例中双方通过管道来传递信息。如果消费者暂时不能获取到信息,则切出协程;控制流交接到主程序再继续切入到生产者去构造信息;生产者一旦完成单次生产操作,也把自身切出给主程序,并进一步调度给曾经切出的消费者从被中断的地方继续执行,直到管道失效,直接终止消费行为,不可再次重入。
你也可以在这里看出飞线的特性,这里的协程语义都可以嵌入到任何语句当中。而且这种无栈协程的实现很短,尽管看着很糟糕的写法,但它不涉及具体的汇编,因此天然有良好的跨平台特性。除此以外,运行时开销(编译器友好)和空间开销(仅一个 int
)更是远胜于有栈协程。(我认为性能是 C++20
采用无栈协程作为标准的重要依据)
它自然是有缺点的,比如对局部变量和可重入的要求会比有栈协程更加苛刻,换句话说就是应用场合很局限,你需要在合适的时机使用。
Asio 协程
无栈协程既然有这么突出的优势,且适用于严格性能要求的异步 IO 场合,因此 Asio(在 C++20
落地十年前)也实现了一版无栈协程。
当然一个良好的习惯是,看代码前先看文档:coroutine (think-async.com)。
这里假定你已经阅读过文档。我们继续探讨它的代码实现。
接口部分:
#ifndef reenter
# define reenter(c) ASIO_CORO_REENTER(c)
#endif
#ifndef yield
# define yield ASIO_CORO_YIELD
#endif
#ifndef fork
# define fork ASIO_CORO_FORK
#endif
实现部分:
class coroutine
{
public:
/// Constructs a coroutine in its initial state.
coroutine() : value_(0) {}
/// Returns true if the coroutine is the child of a fork.
bool is_child() const { return value_ < 0; }
/// Returns true if the coroutine is the parent of a fork.
bool is_parent() const { return !is_child(); }
/// Returns true if the coroutine has reached its terminal state.
bool is_complete() const { return value_ == -1; }
private:
friend class detail::coroutine_ref;
int value_;
};
namespace detail {
class coroutine_ref
{
public:
coroutine_ref(coroutine& c) : value_(c.value_), modified_(false) {}
coroutine_ref(coroutine* c) : value_(c->value_), modified_(false) {}
~coroutine_ref() { if (!modified_) value_ = -1; }
operator int() const { return value_; }
int& operator=(int v) { modified_ = true; return value_ = v; }
private:
void operator=(const coroutine_ref&);
int& value_;
bool modified_;
};
} // namespace detail
#define ASIO_CORO_REENTER(c) \
switch (::asio::detail::coroutine_ref _coro_value = c) \
case -1: if (_coro_value) \
{ \
goto terminate_coroutine; \
terminate_coroutine: \
_coro_value = -1; \
goto bail_out_of_coroutine; \
bail_out_of_coroutine: \
break; \
} \
else /* fall-through */ case 0:
#define ASIO_CORO_YIELD_IMPL(n) \
for (_coro_value = (n);;) \
if (_coro_value == 0) \
{ \
case (n): ; \
break; \
} \
else \
switch (_coro_value ? 0 : 1) \
for (;;) \
/* fall-through */ case -1: if (_coro_value) \
goto terminate_coroutine; \
else for (;;) \
/* fall-through */ case 1: if (_coro_value) \
goto bail_out_of_coroutine; \
else /* fall-through */ case 0:
#define ASIO_CORO_FORK_IMPL(n) \
for (_coro_value = -(n);; _coro_value = (n)) \
if (_coro_value == (n)) \
{ \
case -(n): ; \
break; \
} \
else
#if defined(_MSC_VER)
# define ASIO_CORO_YIELD ASIO_CORO_YIELD_IMPL(__COUNTER__ + 1)
# define ASIO_CORO_FORK ASIO_CORO_FORK_IMPL(__COUNTER__ + 1)
#else // defined(_MSC_VER)
# define ASIO_CORO_YIELD ASIO_CORO_YIELD_IMPL(__LINE__)
# define ASIO_CORO_FORK ASIO_CORO_FORK_IMPL(__LINE__)
#endif // defined(_MSC_VER)
overview
Asio 协程可分为三个部分:
coroutine
协程类。coroutine_ref
代理类。- 接口实现。
协程类 coroutine
唯一保存的是上下文状态机 value
,而它只能被代理类 coroutine_ref
所修改,状态机初始化为 0,会在每一种 yield
初次执行前被修改为特定的状态;除此以外,代理类还有局部私有的 modified
标记,用以区分协程的生命周期,这些部分会在后续接口实现上有更深入的理解。
reenter
我们首先品鉴 reenter
的实现:
#define ASIO_CORO_REENTER(c) \
switch (::asio::detail::coroutine_ref _coro_value = c) \
/* 当结束的协程再次调用时(-1 会在下面 terminate_coroutine 赋值),会进入到这里 */ \
/* 此时也应该满足 if 语句,因此 terminate 是无法重入的 */ \
case -1: if (_coro_value) \
{ \
goto terminate_coroutine; \
/* 这个 label 也会在 yield 用到 */ \
terminate_coroutine: \
/* 这个协程已经结束 */ \
_coro_value = -1; \
goto bail_out_of_coroutine; \
/* 这个 label 也会在 yield 用到 */ \
bail_out_of_coroutine: \
/* 跳出整个协程 block */ \
break; \
} \
/* 正常路径 */ \
else /* fall-through */ case 0:
reenter
描述的是一个协程的入口,整个 reenter(...) { /*code block*/ }
代码块区间都是协程的执行范围。
如果一个协程已经终止(提前 return
,yield break
,throw exception
,或者正常走到作用域的末尾),那么你没办法再进入该协程,会跳出这个协程区间,这部分会在 yield
有更详细的分析。
而一般情况,一个未曾执行过的协程则会匹配到 case 0
,或者未终止的协程则会匹配到 case __LINE__
。
这里需要结合一下代理类的实现:
class coroutine_ref
{
public:
coroutine_ref(coroutine& c) : value_(c.value_), modified_(false) {}
coroutine_ref(coroutine* c) : value_(c->value_), modified_(false) {}
// 如果是经过了 yield,那么不会触发(肯定 modifed,见下面 operator=的解释)
// 除非是这个协程内部还没有 yield 就直接在 reenter block 内 return、抛出异常或者一路走到 reenter block 尾部等行为
// 这些情况下,协程都不再是可重入的
~coroutine_ref() { if (!modified_) value_ = -1; }
operator int() const { return value_; }
// 每次拷贝 ref 时,即设为 modified
// 注意,构造时不会这样(modified = false)
// 拷贝行为发生在 yield 语义上,在##flag #1 处
int& operator=(int v) { modified_ = true; return value_ = v; }
private:
void operator=(const coroutine_ref&);
int& value_;
bool modified_;
};
代理类会在 switch
的范围结尾析构,会通过私有的 modified_
成员来区分协程本身的生命周期,当设置 value_
为 -1 时,将不可再次重入,这些部分可以通过下述 yield
来。
yield
接着是 yield
实现:
#define ASIO_CORO_YIELD_IMPL(n) \
/* cr 的状态机引用 value 被赋值为新的状态 */ \
/* ##flag #1 */ \
for (_coro_value = (n);;) \
/* 正常并不会走到这个 if */ \
if (_coro_value == 0) \
{ \
/* 对于保存过状态 n 的情况下有效,下一次 reenter 就从这里开始 */ \
/* 应该是执行 yield 下一行去了(忽略##flag #4 单行) */ \
case (n): ; \
break; \
} \
else \
/*第一次走到 yield 执行时,cr 已经赋值为 n,即 case 0,走##flag #4 */ \
switch (_coro_value ? 0 : 1) \
/* ##flag #2 */ \
for (;;) \
/* fall-through */ case -1: if (_coro_value) \
goto terminate_coroutine; \
/* ##flag #3 */ \
else for (;;) \
/* fall-through */ case 1: if (_coro_value) \
goto bail_out_of_coroutine; \
/* case 0 接上用户代码 */ \
/* ##flag #4 */ \
else /* fall-through */ case 0:
/* 如果是 yield statement */
/* 执行完用户代码后,再回到##flag #3 处的 for */
/* 从而进入 bail_out_of_coroutine,跳出 yield block,直到下次 reenter */
/* 如果是 yield return statement,那应该 block 下面都无法执行了(要给出 return 结果到 caller),
等待下一次 reenter */
/* 如果是 yield break,则跳到##flag #2,fallthrough 到 -1,终止整个协程 */
在我实现的超简陋版无栈协程中,你可以总结出一个 yield
的套路:
- 保存
state = $LINE
。 return
。case $LINE:
。
(好吧我只是复述一下 #define CO_YIELD(ret)
)
因此这里 Asio 同样也是先保存一个 state
,紧接着走到 ##flag #4
处执行用户提供的代码。
注意这里 Asio 很细心的定制了不同的 yield
行为:
yield statement
.yield return statement
.yield break
.
所以后续的 else
有复杂的嵌套就是为了处理这些行为:
- 如果是
yield statement
,将会让出,直到下次重入再恢复上下文。 - 如果是
yield return statement
,将会返回结果,直到下次重入再恢复上下文。 - 如果是
yield break
,将会让出,且不可重入。
再次说明一下代理类,这里 yield
的赋值过程表示已经是 modified
。
fork
Asio 还做了一个 fork
语义生成子协程
#define ASIO_CORO_FORK_IMPL(n) \
for (_coro_value = -(n);; _coro_value = (n)) \
if (_coro_value == (n)) \
{ \
case -(n): ; \
break; \
} \
else
其实就是一个只执行 2 次的 for
循环,用负数来表示 child
。
下一篇文章:从 C++20 协程,到 Asio 的协程适配
References
Coroutines in C
asio 无栈协程 - chnmagnus
coroutine (think-async.com)
coroutine.hpp at 1408e28 · chriskohlhoff/asio · GitHub
C++20 Coroutine: Under The Hood