背景
这是 perfbook 笔记 关于形式化验证的补充说明,鉴于本人也不会这些(因菜废学!),就只聊工具。本文并非教程,GenMC 的使用很简单,只交代我为什么会推荐这个工具。
内存模型
内存模型很难是明确的事实。对于并非每天接触并行编程的人来说,给定一个 acquire-release 的关系,应该能判断出正确性;但要是来一个 IRIW 示例(独立读-独立写),不一定会意识到 seq-cst 的必要性;再来一大块的数据结构?很难说了。
那么怎么快速确认一个并发程序是正确的呢?
- 方法一是纸面证明(perfbook 12.1.4.3),但是难以避免工程上 memory order 用错的可能性。
- 方法二是 TSAN。它确实性价比很高,但是只能触发运行时特定场景,无法遍历所有场景(状态);另外 TSAN 实现的算法并没有完全覆盖语言标准,超出支持范围会随机上报 data race。
- 方法三是跑测试。但是众所周知实际体系结构和语言抽象机器有差距,并且有编译器优化恰好让你能跑通的例子。这并非实践不可取,事实上测好 x86 和 ARM 已经覆盖正常人的需求。
- 方法四就是使用模型检验工具(model checker),GenMC 是其中一款试过觉得不错的。
其实方法四的工作量是很低的。内存模型文件是既定的(最难的部分已经帮你写好了),市面上的工具又有多种选择,只需挑一个适合自己的就 OK。
工具对比
perfbook 推荐使用 herd7/litmus 测试,它更多关注于硬件体系结构的内存模型和 LKMM(Linux 内核的内存模型)。虽然 C/C++ 内存模型也是支持的(使用 cpp11.cfg),但是不能直接告诉你是否存在 data race(只告诉你手动标记的 exists/forall 是否满足),并且语法上存在一定的限制:比如支持非常有限的 loop-while
(文档提到默认 unroll 两遍),以及本身难以表达复杂的数据结构。
最常见的 C++ 内存模型检验工具是 CppMem,语法和上述类似。不过问题在于 CAS 支持似乎并不完善,我仿写了一个 cppreference 上的正确示例,它报出 data race(求指教?)。另外一个问题是搜索遍历状态极慢,一个测试用例跑了半小时,再加两行代码,我永远等不到结果了。
还有对于应用程序开发更加友好的 CDSChecker。基本支持原生 C++11 语法是一大亮点,这意味着要验证正确性,只需复制粘贴稍作修改即可。很可惜连官方提供的 rwlock 示例都无法遍历完整,因为也使用了复杂的循环。虽然作者已经跑路了(git 服务器连不上,只能下载快照),但还是可以作为备选工具。
GenMC 同样支持 C/C++ 语法,由于前端基于 Clang-16(目前不能使用更高版本),甚至支持 C++20 标准。另一个亮点是极快,文档提到对于循环它是能做足够聪明的转换,因此我的 spinlock 测试不到一秒就能得出结果。注意两点细节:虽说语言标准是支持到 C++20,但是标准库可用率不高,可能原因是工具本身覆盖了标准库的部分代码;另外 GenMC 实际检验的是 RC11 内存模型,与应用程序的实际内存模型有细微差距,比如 relaxed ordering 中的代码无法复现(r1 == r2 == 42),文档第 3.1 节也特意提到这种现象。不过对于一个工具来说,有参考结果就很足够了。
测试用例
// spinlock.cpp
#include <atomic>
#include <stdlib.h>
#include <pthread.h>
struct Spinlock {
enum State {LOCKED, UNLOCKED};
std::atomic<State> _state {UNLOCKED};
void lock() {
if(_state.exchange(LOCKED, std::memory_order_acquire) == LOCKED) {
while(_state.exchange(LOCKED, std::memory_order_relaxed) == LOCKED);
// 移除下面这一行会有不同的结果
std::atomic_thread_fence(std::memory_order_acquire);
}
}
void unlock() { _state.store(UNLOCKED, std::memory_order_release); }
};
static int x;
static Spinlock spinlock;
void* test(void*) {
spinlock.lock();
// 提供 data race 检测的机会
x++;
spinlock.unlock();
return nullptr;
}
// GenMC 不支持 std::thread 和 std::jthread
auto make_thread(auto func) {
pthread_t tid;
pthread_create(&tid, {}, func, {}) && (abort(), 1);
return tid;
}
int main() {
auto t1 = make_thread(test);
auto t2 = make_thread(test);
// ...
}
参数:./genmc -disable-sr -- -std=c++20 spinlock.cpp
有些 warning 是面向内存模型自身的实现,对于应用程序开发可以忽略(运行过程会提示关闭方法)。我们只需关注是否有 hard error 即可。(No errors were detected.)
附录
另一个 SPSC 示例
#include <array>
#include <atomic>
#include <bit>
#include <utility>
#include <stdlib.h>
#include <pthread.h>
// Linux 内核的 kfifo 移植实现
template <typename T, std::size_t SIZE /* Must be a power of 2. */>
requires requires { requires std::has_single_bit(SIZE); }
struct Kfifo {
std::array<T, SIZE> _buffer {};
alignas(64) std::atomic<std::size_t> _in {};
alignas(64) std::atomic<std::size_t> _out {};
constexpr auto mod(auto value) {
return value & (SIZE - 1);
}
bool push(T elem) {
// Only the producer can modify `_in`.
std::size_t in = _in.load(std::memory_order_relaxed);
std::size_t next_in = mod(in+1);
if(next_in == _out.load(std::memory_order_acquire)) {
return false;
}
_buffer[in] = std::move(elem);
// Let the consumer know your change.
_in.store(next_in, std::memory_order_release);
return true;
}
bool pop(T &val) {
std::size_t out = _out.load(std::memory_order_relaxed);
std::size_t in = _in.load(std::memory_order_acquire);
if(in == out) {
return false;
}
val = std::move(_buffer[out]);
_out.store(mod(out+1), std::memory_order_release);
return true;
}
};
static std::array data {998, 244, 353};
static Kfifo<int, 64> fifo;
void* producer(void*) {
for(auto i : data) {
fifo.push(i);
}
return nullptr;
}
void* consumer(void*) {
int i {};
fifo.pop(i);
return nullptr;
}
// GenMC 不支持 std::thread 和 std::jthread
auto make_thread(auto func) {
pthread_t tid;
pthread_create(&tid, {}, func, {}) && (abort(), 1);
return tid;
}
int main() {
// 单生产者-单消费者的场合
// 只存在 t1 和 t2 是合法的
auto t1 = make_thread(producer);
auto t2 = make_thread(consumer);
// 添加 t3 使得示例成为错误的 MPSC
// 添加 t4 使得示例进一步成为错误的 MPMC
// auto t3 = make_thread(producer);
// auto t4 = make_thread(consumer);
// 要想保留 t3 和 t4 并且保持程序合法
// 可以使用上面的 spinlock 实现
// 为 t1/t3 和 t2/t4 单独上锁
// ...
}
RC11 禁止的 relaxed ordering 行为
// relaxed.cpp
#include <atomic>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
// 复现 cppreference 的示例代码
constexpr auto link [[maybe_unused]] =
"https://en.cppreference.com/w/cpp/atomic/memory_order#Relaxed_ordering";
////////////////////////////////////////////////// pthread wrappers
auto make_thread(auto func) {
pthread_t tid;
pthread_create(&tid, {}, func, {}) && (abort(), 1);
return tid;
}
auto join_threads(auto ...threads) {
(pthread_join(threads, {}), ...);
}
auto pthread_routine(auto f) /* requires lambda */
requires requires { f.operator()(); } {
return +[](void*) -> void* {
return decltype(f)()(), nullptr;
};
}
////////////////////////////////////////////////// pthread wrappers (END)
std::atomic<int> x {}, y {};
int r1 {}, r2 {};
auto thread1 = pthread_routine([] {
r1 = y.load(std::memory_order_relaxed); // A
x.store(r1, std::memory_order_relaxed); // B
});
auto thread2 = pthread_routine([] {
r2 = x.load(std::memory_order_relaxed); // C
y.store(42, std::memory_order_relaxed); // D
});
int main() {
auto t1 = make_thread(thread1);
auto t2 = make_thread(thread2);
join_threads(t1, t2);
// 执行 ./genmc -- -std=c++20 relaxed.cpp | grep -E "^\(.*\)$"
// 没有 (42 42) 的可能性
printf("(%d %d)\n", r1, r2);
}