无锁队列的原理和实现
无锁队列是指多线程对于队列的操作无需添加粒度较大的独占锁mutex
, 而是通过细粒度的CAS(compare and swap)
原子操作实现多线程下的同步. 其主要思想是: 使用mutex
的锁的代价太高, 那就使用更小粒度的锁, 甚至这个锁的粒度是一条赋值指令.
很多情况下, 一个完整事件的处理流程可以拆分成几个子事件. 虽然各子事件有顺序关系, 但无强烈的时间上的约束. 这样当该阶段处理完成后, 我们可以给下一个阶段传递一条消息. 下阶段接收到消息后, 再进行处理. 这里传递消息的数据结构可以是无锁队列
. 保证了高并发和分布式场景下的快速处理需求和一定的前后顺序. 这就类似于生产者-消费者
问题: 多个生产者产生多个事件, 给下一阶段的消费者处理.
本篇博客主要参考了酷壳陈皓关于无锁队列的讲解博客以及原始的论文-Implementing Lock-Free Queues和改进的论文-Simple, Fast, and Practical Non-Blocking and Blocking ConcurrentQueue Algorithms.
CAS操作
Compare And Swap (CAS)
是现代操作系统支持的原子性操作, 如X86中CMPXCHG
汇编指令. 其实现逻辑如下:
1 | bool compare_and_swap(int *addr, int oldVal, int newVal) |
addr
指向内存中的某个变量, 如果该内存值为oldVal
, 那么就将其置newVal
并返回成功; 否则直接返回失败.
有了CAS
操作, 多线程情况下我们可以保证某个共享变量值为我们期待值时才执行某些操作, 从而实现同步.
- GCC的CAS
1 | bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...) |
- cpp11的CAS
1 | template< class T > |
逻辑实现
可以通过链表或数组实现. 数组实现的话由于数组大小需要指定而队列长度不一定知道, 因此需要实现循环队列. 存在一定的局限性.
基于链表的实现
用链表实现队列的时候, 为了避免麻烦的边界判断, 我们可以添加头结点
. 初始化时申请头结点, 并且head
指针和tail
指针均指向该结点.
入队操作
设入队新建结点的指针为
p
, 则p -> next = nullptr
, 入队操作需要两步实现.- tail -> next = p
- tail = p
为了保证第一步的并发正确性. 我们持续不断获取
tail
的快照, 记为cur
. 直到CAS(cur -> next, nullptr, p)
. 即如果cur -> next = nullptr
成立, 则当前cur
一定为正确的tail
. 接着我们将cur -> next
更新成p
. 这样是正确的. 因为当某个线程执行完这一步后.tail
指针还未更新成p
, 这样其他所有线程都在轮询等待tail
指针的next
为空, 无法入队.当轮询完成第一步后, 我们退出循环. 然后执行
CAS(tail, cur, p)
. 即将尾指针置为新入队结点.- 结点定义
1
2
3
4
5
6
7
8
9
10// 使用cpp11实现, 用了atomic类对象的方法实现CAS操作
// 结点定义
struct Node {
T val;
atomic<Node*> next;
Node () : val(T()), next(atomic<Node*>(nullptr)) {}
Node (const T& _v) : val(_v), next(atomic<Node*>(nullptr)){}
Node (const T& _v, atomic<Node*> _next) : val(_v), next(_next) {}
};- 入队(VERSION-1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// VERSION-1
void enqueue(T _val) {
// p指向插入结点
Node* p = new Node(_val);
Node* cur;
//
Node* null = nullptr;
while (true) {
cur = tail.load();
// CAS(cur -> next, nullptr, p)
if (cur -> next.compare_exchange_weak(null, p) == true)
break;
}
// CAS(tail, cur, p)
tail.compare_exchange_weak(cur, p);
}以上操作的流程会出现一个问题. 因为只有入队的线程才能将
tail
指针更新, 因此如果该线程入队后还未更新tail
就结束了. 那么其他所有入队线程都将死锁. 因为他们CAS(cur -> next, nullptr, p)
时,cur -> next
一定不为空, 此时可能出现cur -> next = p
.为了解决上述可能存在的问题. 我们主要解决思路是让其他线程也可以移动
tail
指针.一个简单的方法是, 如果当前线程
CAS(cur -> next, nullptr, p)
入队失败了, 那么说明当前cur
不是真正的tail
, 那么我们尝试CAS(tail, cur, cur -> next)
. 即尝试让tail
往后走. 然后再获取tail
的快照, 按照之前的流程轮询(Retry-loop).- 入队(VERSION-2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// VERSION-2
void enqueue(T _val) {
// p指向插入结点
Node* p = new Node(_val);
Node* cur;
//
Node* null = nullptr;
while (true) {
cur = tail.load();
// CAS(cur -> next, nullptr, p)
if (cur -> next.compare_exchange_weak(null, p) == true)
break;
// CAS(tail, cur, cur -> next)
else
tail.compare_exchange_weak(cur, cur -> next)
}
// CAS(tail, cur, p)
tail.compare_exchange_weak(cur, p);
}较为复杂的方法是, 获取
tail
的快照cur
, 并记录cur -> next
为next
. 接着判断cur
是否为tail
. 如果不为就重新开始. 然后判断next
是否为空. 如果不为空, 我们就尝试更新tail
, 更新的语句是CAS(tail, cur, next)
. 如果为空. 我们就尝试CAS(tail -> next, next, p)
入队(此时next = nullptr
).- 入队(VERSION-3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25void enqueue (T _val) {
// 插入结点为p
Node* p = new Node(_val);
Node *cur, *next;
Node* null = nullptr;
while (true) {
// cur 指针为尾结点的快照
cur = tail.load();
// next 指针为为尾结点下一个结点
next = cur -> next;
// 如果尾指针被移动, 则重新获取cur和next (可能被其他线程移动了, 重新获取以减少CAS操作次数)
if (cur != tail.load())
continue;
if (next != nullptr) {
// 当next指针不为空时, 尝试移动tail指针, 防止出现死锁
tail.compare_exchange_weak(cur, next);
continue;
}
// 实现第一步操作: tail -> next = p
if (cur->next.compare_exchange_weak(null, p) == true)
break;
}
// 实现第二步操作: tail = p
tail.compare_exchange_weak(cur, p);
};出队操作
出队操作类似于入队操作的思路. 我们首先获取
head
的快照cur
. 然后判断cur -> next
是否为空, 如果为空说明此时队列为空, 返回队列为空的信号. 如果不为空, 我们CAS(head, cur, cur -> next)
来重置head
的指针. 然后返回cur -> next -> val
, 因为此时cur
指向之前的头结点,cur -> next -> val
为出队前的队头.- 出队(VERSION-1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17bool dequeue (T& _ret) {
// 返回true: 出队成功, 返回值存在 _ret参数中
Node* cur;
while (true) {
cur = head.load();
if (cur -> next == nullptr)
return false;
// CAS(head, cur, cur -> next)
if (head.compare_exchange_weak(cur, cur -> next))
break;
}
_ret = cur -> next -> val;
// 释放原头结点
delete cur;
return true;
}上述思路存在几个问题. 首先比较严重的是, 如果我们
CAS(head, cur, cur -> next)
成功了, 也就是逻辑上执行了head = head -> next
, 此时如果另外一个线程也执行了出队操作, 并且已经将队头free掉了. 那我们再访问cur -> next -> val
时明显就出现了访问非法内存. 因此一个核心问题是: 在获取到其队头元素值之前, 其他线程不能执行出队操作.为了保证队头结点不被free掉. 我们可以在
CAS
操作之前访问. 这样保证CAS
操作成功后, 我们一定已经拿到了队头. 即使此时其他线程执行出队操作. 也不影响该线程的正确性. 具体而言, 我们定义一个next
指针, 初始next = cur -> next
. 然后判断cur
是否等于head
. 如果不等于则重新获取快照(减少不必要的CAS
操作). 如果等于, 接着判断next
是否为空, 如果为空说明此时队列为空, 返回. 接着先记录next
结点的值, 然后尝试通过CAS
操作移动head
. 即CAS(head, cur, cur -> next)
.- 出队(VERSION-2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24bool dequeue (T& _ret) {
Node *cur, *next;
while (true) {
cur = head.load();
next = cur->next;
// 如果头指针被移动, 则重新获取其快照 (可能被其他线程移动了, 重新获取以减少CAS操作次数)
if (cur != head.load())
continue;
// 队列为空, 出队失败
if (next == nullptr)
return false;
// 在CAS操作前记录返回值, 如果在CAS操作之后则该结点有可能被其他线程free
_ret = next->val;
// 实现出队的操作. head = head -> next
if (head.compare_exchange_weak(cur, next))
break;
}
// 释放原头结点
delete cur;
return true;
}上述版本还存在一个问题. 如果在判断
next == nullptr
时, 一个入队操作执行完成了第一步, 还没移动tail
指针. 此时如果队列只有这个一个元素. 那么head = tail
且next = p
. 出队的话会将tail
指向被free的原头结点. 具体图示可以查看酷壳陈皓关于无锁队列的讲解博客解决的思路和入队解决死锁的思路是一样的. 即当我们发现
head == tail
且next != nullptr
时, 我们尝试更新tail
即可.- 出队(VERSION-3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30bool dequeue (T& _ret) {
Node *nowHead, *nowTail, *next;
while (true) {
nowHead = head.load(), nowTail = tail.load();
next = nowHead -> next;
// 如果头指针被移动, 则重新获取其快照 (可能被其他线程移动了, 重新获取以减少CAS操作次数)
if (nowHead != head.load())
continue;
// 队列为空, 出队失败
if (nowHead == nowTail and next == nullptr)
return false;
// tail指针未正确取值, 尝试移动tail
if (nowHead == nowTail and next != nullptr) {
tail.compare_exchange_weak(nowTail, next);
continue;
}
// 在CAS操作前记录返回值, 如果在CAS操作之后则该结点有可能被其他线程free
_ret = next->val;
// 实现出队的操作. head = head -> next
if (head.compare_exchange_weak(nowHead, next))
break;
}
// 释放原头结点
delete nowHead;
return true;
}
代码实现
首先通过上述分析, 使用cpp11
进行了代码实现. 接着对实现的代码进行正确性测试、内存检查和性能测试.
- 正确性检查思路. 检查每个物品是否只入队一次且只出队一次.
- 内存检查. CLion中使用
Valgrind
的Memcheck
工具在WSL
环境下进行检查. - 性能测试. 与使用
mutex
的版本进行比较.
- setting.h: 全局设置
1 | // |
- LockFreeQueue.h: 无锁队列的实现
1 | // |
- check.cpp: 正确性检查
1 | // |
- benchmark_lockfree.cpp: 无锁队列性能测试
1 | // |
测试结果
- 正确性测试
1 | produce 1 finish. |
- Valgrind内存检查
- 无锁队列性能测试
1 | epoch cost average time = 987 ms |
- mutex队列性能测试
1 | epoch cost average time = 378 ms |
分析总结
使用无锁队列比
mutex
更慢的原因可能有2. 其一是无锁队列中使用了结构体, 并使用new
和delete
申请和释放, 频繁调用导致比mutex
版本的int
更慢. 其二是结构体使用了atomic<Node*>
作为next
指针. 如果使用普通的Node*
并使用__sync_bool_compare_and_swap()
可能会比现在更快.实验用的机器是自己的笔记本. cpu是5800H, 八核心十六线程. 使用设置
生产线程=2, 消费线程=2, 生产个数=2e6
的时候耗时700ms
左右; 当设置生产线程=4, 消费线程=4, 生产个数=1e6
的时候耗时900ms
左右; 而设置生产线程=8, 消费线程=8, 生产个数=5e5
的时候耗时在1300ms
左右. 相同的生产个数, 随着核心数的增加, 耗时却在增加, 这可能是多核情况下必须缓存一致性协议(MESI)来保持多核缓存一致, 而且使用了atomic
, 导致耗时增加.无锁队列可能适应于分布式场景下. 利用多设备的计算资源进行业务的分阶段处理.
参考
- 酷壳陈皓关于无锁队列的讲解博客
- Implementing Lock-Free Queues
- Simple, Fast, and Practical Non-Blocking and Blocking ConcurrentQueue Algorithms
TODO
- 代码整理, 上传Github
- 解决
ABA
问题