x86-TSO
图10
x86-TSO 有下面几个特点:
那么最后只有 StoreLoad barrier 是有效的,其他屏障都是no-op。
06
6.1 关于 Store Buffer 的测试
6.1.1 测试核心内是否存在 Store Buffer
解析
现代 Intel CPU 和 AMD x86 中都有 Store Buffer 结构。
在我的电脑上使用 smp_mb、mb 或 rmb 可以使上述情况不再出现,而使用 barrier 或 wmb 问题还在;
除此之外,还可以使用高级语言的原子变量来解决。
6.1.2 测试核心间是否共享 Store Buffer
解析
核心3 的ECX = 1 和 EDX = 0 与上述同理。
总结
实际上,上述现象不允许在任何 CPU 上观察到,在我的电脑上没有出现;
如果 核心0 和 核心1 有各自的 Store Buffer;
核心0 将 x = 1 缓存在自己的 Store Buffer 中,并且根据 Store Forwarding 原则,核心0 读取 x 到 EAX 的时候会读取自己的 Store Buffer (中 x = 1),故 EAX = 1;
同理,核心1 也会缓存自己的写操作, 即缓存 y = 2 和 x = 2 到自己的 Store Buffer,因此 y = 2 这个操作不会被核心0 观察到,核心0 从共享存储中读到 y = 0 ,故 EBX = 0;
出现上述情况就说明核心存在 Store Buffer,并且有转发功能;
在我的电脑(i7)上可以出现上述现象;
6.2 测试 CPU 是否乱序执行
6.2.1 测试:StoreStore 乱序
解析
所以,如果 EAX 读到 1 的话,那么 EBX 一定不是 0。
总结
6.2.2 测试:LoadStore 乱序
解析
总结
解析
EBX 会优先读取 核心1 中的 Store Buffer ,所以 EBX 不可能等于 1 ;
总结
n5 实际上不应该在任何 CPU 上观察到。
6.3.2 测试:n4b
解析
总结
n4b 实际上不应该在任何 CPU 上观察到。
解析
总结
在乱序(out-of-order)CPU 上,写写和读写都是乱序,就不可能保证写的传递性了;
07
CAS原理
比较并交换(compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。
下面代码是使用 CAS 的一个例子(无锁队列 Pop 函数)
template
bool AtomQueue
::Pop(T& v) {
uint64_t tail = tail_;
if (tail == head_ || !valid_[tail])
return false;
if (!__sync_bool_compare_and_swap(&tail_, tail, (tail + 1) & mod_))
return false;
v = std::move(data_[tail]);
valid_[tail] = 0;
return true;
}
如果这块内存的值在这期间内没被修改过,则旧值 会与内存中的数据相同,这时 CAS 操作将会成功执行,使内存中的数据变为新值。
如果内存中的值在这期间内被修改过,则一般来说旧值 会与内存中的数据不同,这时 CAS 操作将会失败,新值 将不会被写入内存。
7.1 应用
在应用中 CAS 可以用于实现无锁数据结构,常见的有无锁队列(先入先出)以及无锁栈(先入后出)。对于可在任意位置插入数据的链表以及双向链表,实现无锁操作的难度较大。
7.2 ABA问题
ABA问题是无锁结构实现中常见的一种问题,可基本表述为:
图12
有一个栈(先入后出)中有 top 和 NodeA,NodeA 目前位于栈顶,top指针指向 A。现在有一个线程 P1 想要 pop 一个节点,因此按照如下无锁操作进行
pop()
{
do{
ptr = top; // ptr = top = NodeA
next_ptr = top->next; // next_ptr = NodeX
} while(CAS(top, ptr, next_ptr) != true);
return ptr;
}
图13
线程 P2 首先 pop 出 NodeA,之后又 push 了两个 NodeB 和 C,由于内存管理机制中广泛使用的内存重用机制,导致 NodeC 的地址与之前的 NodeA 一致。
这时 P1 又开始继续运行,在执行 CAS 操作时,由于 top 依旧指向的是 NodeA 的地址(实际上已经变为 NodeC ),因此将 top 的值修改为了 NodeX,这时栈结构如下:
图14
经过 CAS 操作后,top 指针错误地指向了 NodeX 而不是 NodeB。
简单的解决办法是采用 DCAS(双长度 CAS),一个 CAS长度 保存原始有效数据,另一个 CAS长度 保存累计变化的次数,第一个 CAS 可能出现 ABA 问题,但是第二个 CAS 极难出现 ABA 问题。
7.3 实现
CAS 操作基于 CPU 提供的原子操作指令实现。对于 Intel X86 处理器,可通过在汇编指令前增加 lock 前缀来锁定系统总线,使系统总线在汇编指令执行时无法访问相应的内存地址。而各个编译器根据这个特点实现了各自的原子操作函数。
08
原子操作
程序代码最终都会被翻译为 CPU 指令,一条最简单的加减法语句都会被翻译成几条指令执行;为了避免语句在 CPU 这一层级上的指令交叉带来的不可预知行为,在多线程程序设计时必须通过一些方式来进行规范,最常见的做法就是引入互斥锁,但互斥锁是操作系统这一层级的,最终映射到 CPU 上也是一堆指令,是指令就必然会带来额外的开销。
既然 CPU 指令是多线程不可再分的最小单元,那我们如果有办法将代码语句和指令对应起来,不就不需要引入互斥锁从而提高性能了吗? 而这个对应关系就是所谓的原子操作;在 C++11 的 atomic 中有两种做法:
可以通过 is_lock_free 函数,判断一个 atomic 是否是 lock-free 类型。
原子操作有三类:
使用原子操作模拟互斥锁的行为就是自旋锁,互斥锁状态是由操作系统控制的,自旋锁的状态是程序员自己控制的,常用的自旋锁模型有:
strong 版本不会有这个问题,但在某些平台上 strong 版本比 Weak 版本慢(在 x86 平台他们之间没有任何性能差距);绝大多数情况下,优先选择使用 strong 版本;
LOCK 时自旋锁是自己轮询状态,如果不引入中断机制,会有大量计算资源浪费到轮询本身上;常用的做法是使用yield切换到其他线程执行,或直接使用sleep暂停当前线程.
8.2 C++ 内存模型
C++11 原子操作的很多函数都有个 std::memory_order 参数,这个参数就是这里所说的内存模型,对应缓存一致性模型,其作用是对同一时间的读写操作进行排序,一共定义了 6 种类型如下:
8.3 C++ volatile
这个关键字仅仅保证数据只在内存中读写,直接操作它既不能保证操作是原子的,也不能通用地达到内存同步的效果;
由于 volatile 不能在多处理器的环境下确保多个线程能看到同样顺序的数据变化,在今天的通用应用程序中,不应该再看到 volatile 的出现。
09
无锁队列
下面是我采用 CAS 实现了一个多生产者多消费者无锁队列,设计参考 Disruptor ,最高可达 660万QPS(单生产者单消费者)和 160万QPS(10个生产者10个消费者)。
9.1 设计思路
1、如图15,使用 2 个环形数组,数组元素均非原子变量,一个存储 T 范型数据(一般为指针),另一个是可用性检查数组(uint8_t)。Head 是所有生产者的竞争标记,Tail 是所有消费者的竞争标记。红色区表示待生产位置,绿色区表示待消费位置。
图15
2、生产者们通过 CAS 来竞争和移动 Head,抢到 Head 的生产者,先将 Head 加1,再生产原 Head 位置的数据;同样的消费者们通过 CAS 来竞争和移动 Tail,抢到 Tail 的消费者,先将 Tail 加1,再消费原 Tail 位置的数据 。
9.2 实现细节
下面多生产者多消费者无锁队列的代码是在 x86-64(x86-TSO) 平台上编写和测试的。
Talk is cheap. Show me the code.
9.2.1 AtomQueue类模板定义
template
class AtomQueue
{
public:
AtomQueue(uint64_t size);
~AtomQueue();
bool Push(const T& v);
bool Pop(T& v);
private:
uint64_t P0[8]; //频繁变化数据, 避免伪共享, 采用Padding
uint64_t head_; //生产者标记, 表示生产到这个位置,但还没有生产该位置
uint64_t P1[8];
uint64_t tail_; //消费者标记, 表示消费到这个位置,但还没有消费该位置
uint64_t P2[8];
uint64_t size_; //数组最大容量, 必须满足2^N
int mod_; //取模 % -> & 减少2ns
T* data_; //环形数据数组
uint8_t* valid_; //环形可用数组,与数据数组大小一致
};
9.2.2 构造函数与析构函数
template
AtomQueue
::AtomQueue(uint64_t size) : size_(size << 1), head_(0), tail_(0) {
if ((size_ & (size_ - 1)))
{
printf("AtomQueue::size_ must be 2^N !!!\n");
exit(0);
}
mod_ = size_ - 1;
data_ = new T[size_];
valid_ = new uint8_t[size_];
std::memset(valid_, 0, sizeof(valid_));
}
template
AtomQueue
::~AtomQueue() {
delete[] data_;
delete[] valid_;
}
9.2.3 生产者调用的 Push 函数 和 消费者调用的 Pop 函数
template
bool AtomQueue
::Push(const T& v) {
uint64_t head = head_, tail = tail_;
if (tail <= head ? tail + size_ <= head + 1 : tail <= head + 1)
return false;
if (valid_[head])
return false;
if (!__sync_bool_compare_and_swap(&head_, head, (head + 1) & mod_))
return false;
data_[head] = v;
valid_[head] = 1;
return true;
}
template
bool AtomQueue
::Pop(T& v) {
uint64_t tail = tail_;
if (tail == head_ || !valid_[tail])
return false;
if (!__sync_bool_compare_and_swap(&tail_, tail, (tail + 1) & mod_))
return false;
v = std::move(data_[tail]);
valid_[tail] = 0;
return true;
}
解决办法是添加读写屏障(LoadStore barrier),如下表格:
通过执行反汇编命令(objdump -S a.out)得到 Push 中下面代码的汇编代码。
if (!__sync_bool_compare_and_swap(&tail_, tail, (tail + 1) & mod_))
400a61: 48 8b 45 f8 mov -0x8(%rbp),%rax
400a65: 48 8d 50 01 lea 0x1(%rax),%rdx
400a69: 48 8b 45 e8 mov -0x18(%rbp),%rax
400a6d: 8b 80 d8 00 00 00 mov 0xd8(%rax),%eax
400a73: 48 98 cltq
400a75: 48 89 d1 mov %rdx,%rcx
400a78: 48 21 c1 and %rax,%rcx
400a7b: 48 8b 45 e8 mov -0x18(%rbp),%rax
400a7f: 48 8d 90 88 00 00 00 lea 0x88(%rax),%rdx
400a86: 48 8b 45 f8 mov -0x8(%rbp),%rax
400a8a: f0 48 0f b1 0a lock cmpxchg %rcx,(%rdx)
400a8f: 0f 94 c0 sete %al
400a92: 83 f0 01 xor $0x1,%eax
400a95: 84 c0 test %al,%al
400a97: 74 07 je 400aa0 <_ZN9AtomQueueIiE3PopERi+0x8c>
return false;
400a99: b8 00 00 00 00 mov $0x0,%eax
400a9e: eb 40 jmp 400ae0 <_ZN9AtomQueueIiE3PopERi+0xcc>
发现 __sync_bool_compare_and_swap 函数对应的汇编代码为:
400a8a: f0 48 0f b1 0a lock cmpxchg %rcx,(%rdx)