现代CPU通常通过cache来加速内存的访问效果,例如arm 的MESI协议,但是我们不能轻易的忽视cacheline导致的代码问题。本文介绍一下cacheline导致的代码问题。
arm系列cpu的cacheline通常是64字节,如果结构体的数据跨cacheline,MESI协议只能够有效的从Poc和Pou视角保证变量的一致性。跨cacheline的那个数据成员其实并没办法很好的保证其值在每时每刻的正确性。参考:《cache的策略》,但是这种跨cacheline的行为(跨CPU)还是有可能出现数据和理论情况不一致的情况。
为了构造上面的问题,我需要构造一个60字节+8字节的结构体,如下
#pragma pack(1) struct data { int32_t pad[15]; int64_t v; }; static struct data value __attribute__((aligned(64)));
这种情况下,pad默认填充了cacheline的前60字节,而v变量正好处于cacheline的前4字节和后4字节
为了构造cacheline导致的数据异常的案例,我还需要将其放在线程里面运行,这样data结构体的实例化必须是全局的。
注意:这里 pack(1) 是强制按照1字节对齐,通过aligned(64)保证了value结构体一直在cacheline的开头
到这里基本上我的用意很明显了,我需要有一个线程运行函数,如下
static void worker(int *cnt) { for (int64_t i = 0; i < loop_count; ++i) { const int64_t t = value.v; *cnt += 1; value.v = ~t; __asm__ volatile("" ::: "memory"); } }
值得注意的是,为了避免编译器给我指令重排,我需要为其添加内存栅栏
__asm__ volatile("" ::: "memory");
这里的输入部和输出部都是空,所以此汇编只是简单的声明内存栅栏
测试程序需要两个输入参数,一个是线程的个数,为了让这个任务在多个CPU上运行,另一个是线程将value.v反转的次数。
int64_t n = min(atol(argv[1]), N_THREADS); loop_count = atol(argv[2]);
根据worker函数是一个非常简单的逻辑,就是简单的将value.v不断反转。这个代码运行理论上的值应该是
FFFFFFFFFFFFFFFF 0000000000000000
那实际上呢? 我们运行看看
# ./cacheline_bug 8 10000 iteration: 80000 data size: 68 final: FFFFFFFFFFFFFFFF # ./cacheline_bug 8 10000 iteration: 80000 data size: 68 final: 0000000000000000 # ./cacheline_bug 8 10000 iteration: 80000 data size: 68 final: 00000000FFFFFFFF # ./cacheline_bug 8 10000 iteration: 80000 data size: 68 final: FFFFFFFF00000000
可以看到,v的值有四种情况,这里
00000000FFFFFFFF FFFFFFFF00000000
就是cache带来的隐藏问题。
在开发高性能的程序的时候,一方面需要利用好cache的特性,另一方面在存在数据竞争的场景下也需要考虑周全,否则就容易出现cache带来的隐藏bug。
下面分享一下测试的源码,本文分享基于arm V8 上运行
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #ifndef min #define min(x, y) (x) < (y) ? (x) : (y) #endif #define N_THREADS 128 static int64_t loop_count = 0; #pragma pack(1) struct data { int32_t pad[15]; int64_t v; }; static struct data value __attribute__((aligned(64))); static int64_t counter[N_THREADS]; static void worker(int *cnt) { for (int64_t i = 0; i < loop_count; ++i) { const int64_t t = value.v; *cnt += 1; value.v = ~t; /* creates a compiler level memory barrier * forcing optimizer to not re-order memory * accesses across the barrier. */ __asm__ volatile("" ::: "memory"); } } int main(int argc, char *argv[]) { pthread_t threads[N_THREADS]; if (argc != 3) { fprintf(stderr, "USAGE: %s <threads> <loopcount>\n", argv[0]); return 1; } int64_t n = min(atol(argv[1]), N_THREADS); loop_count = atol(argv[2]); for (int64_t i = 0L; i < n; ++i) pthread_create(&threads[i], NULL, (void *(*) (void *) ) worker, &counter[i]); int64_t count = 0L; for (int64_t i = 0L; i < n; ++i) { pthread_join(threads[i], NULL); count += counter[i]; } printf("iteration: %lu\n", count); printf("data size: %lu\n", sizeof(value)); printf("final: %016lX\n", value.v); return 0; }