神奇的高性能无锁队列JCTools
# 前言
最近在看Netty源码的时候,关注到了JCTools。JCTools是什么?是一个高性能的无锁并发工具包。早在96年就有论文提出了无锁队列的概念,再到后来 Disruptor,高性能已得到生产的验证。此处介绍的 Jctools 中的高性能队列,其性能丝毫不输于 Disruptor。那Disruptor与JCTools都是高性能队列的处理有什么区别了?
# 什么是JCTools
JCTools (Java Concurrency Tools) 提供了一系列非阻塞并发数据结构(标准 Java 中缺失的),当存在线程争抢的时候,非阻塞并发数据结构比阻塞并发数据结构能提供更好的性能。
CTools 是适用于 JVM 并发开发的工具,主要提供了一些 JDK 确实的并发数据结构,例如非阻塞 Map、非阻塞 Queue 等。其中非阻塞队列可以分为四种类型,可以根据不同的场景选择使用。
JCTools 是一个开源工具包,在 Apache License 2.0 下发布,并在 Netty、Rxjava 等诸多框架中被广泛使用。
# 非阻塞 Map
- ConcurrentAutoTable
- 后面几个map/set结构的基础
- NonBlockingHashMap :
- 是对 ConcurrentHashMap 的增强,对多 CPU 的支持以及高并发更新提供更好的性能。
- NonBlockingHashMapLong
- 是 key 为 Long 型的 NonBlockingHashMap。
- NonBlockingHashSet
- 是对 NonBlockingHashMap 的简单包装以支持 set 的接口。
- NonBlockingIdentityHashMap
- 是从 NonBlockingHashMap 改造来的,使用 System.identityHashCode() 来计算哈希。
- NonBlockingSetInt
- 是一个使用 CAS 的简单的 bit-vector。
# 非阻塞 Queue
JCTools 提供的非阻塞队列分为 四种类型,可以根据不同的应用场景选择使用:
- Spsc:单生产者单消费者(有界和无界)
- Mpsc:多生产者单消费者(有界和无界)
- Spmc:单生产者多消费者(有界)
- Mpmc:多生产者多消费者(有界)
“生产者”和“消费者”是指“生产线程”和“消费线程”。
# 基于实现数据结构分类
- 基于数组的队列:
- MpscArrayQueue
- MpmcArrayQueue
- SpscArrayQueue
- SpmcArrayQueue
- 基于链表的队列:
- MpscLinkedQueue
- SpscLinkedQueue
- 基于LinkedArray的队列
- SpscChunkedArrayQueue
- SpscGrowableArrayQueue
- SpscUnboundedArrayQueue
- MpscGrowableArrayQueue
- MpscChunkedArrayQueue
- MpscUnboundedArrayQueue(Netty中使用)
- XaddQueue:
- MpscUnboundedXaddArrayQueue
- MpmcUnboundedXaddArrayQueue
- 其它类型队列:
- MpscCompoundQueue
# 源码解析
首先我们先从简单的 MpscArrayQueue 队列的源码开始分析:
# 入队offer()
跟进源码之前,首先回顾下 MpscArrayQueue 的重要属性
// ConcurrentCircularArrayQueue
/**
* Q:这个值有什么作用?
* A:数组长度对应的掩码 - 方便用&运算代替较慢的%运算 子类需要继续缓存行填充,以避免mask 和 buffer 产生伪共享
* eg. capacity=16 0000 0000 0001 0000
* mask: capacity-1=15 0000 0000 0000 1111
* h & (length-1) = h % length
*/
protected final long mask;
// 真正存放队列数据的数组
protected final E[] buffer;
// MpmcArrayQueueProducerIndexField
private volatile long producerIndex; // 生产者的索引
// MpscArrayQueueProducerLimitField
/**
* 在重新读取消费者索引之前,第一个不可用的生产者索引。
* <p>
* Q: 这个值有什么用,直接读取consumerIndex计算不行吗?
* A: {@code consumerIndex}是一个变化较为频繁的值,因此它所在的缓存行极易失效,从而影响读性能。
* 我们拷贝一个副本(并在副本无效的时候更新),这样可以减少生产者与消费者之间产生的伪共享,从而提高读效率.
* <p>
* Q: 该值为什么进行缓存行填充,为什么与producerIndex分离?
* A: 因为是多生产模式,因此producerIndex上将产生高度竞争,因此其所在的缓存行极易失效,
* 将该值与producerIndex分开,我们期望该值大部分时间位于用于共享(且很少失效)的缓存行中。
* PS: 该值的更新频率远低于producerIndex。
*/
private volatile long producerLimit;
// MpscArrayQueueConsumerIndexField
/**
* 消费者索引(当前消费进度).
* 这是一个滞后值,消费者先消费可用槽位数据,再更新消费进度;
*/
protected long consumerIndex;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
跟进 offer() 方法的源码:
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
//producerLimit基于consumerIndex计算的一个缓存值,用户减少对consumerIndex的读取(减少缓存行miss),在循环中可能更新
final long mask = this.mask;
long producerLimit = lvProducerLimit();
long pIndex;
do
{
//获取生产者索引
pIndex = lvProducerIndex();
if (pIndex >= producerLimit)
{
// 生产者索引大于等于缓存的上限,表示根据缓存值认为队列已满。
// 此时,分两种情况:1. 队列真的满了。 2.缓存过期了。
// 因此需要读取最新的消费者索引,计算新的上限,判断队列是否是真的满了(以满足Queue对offer的语义要求)
// 获取消费者索引
final long cIndex = lvConsumerIndex();
//设计的很巧妙,仔细分析下。就是生产速度最大快于消费者一个容量的大小
//buffer 的大小=mask+1
producerLimit = cIndex + mask + 1;
//生产速度是否快于消费速度
if (pIndex >= producerLimit)
{
// 最新的消费者索引显式队列确实已满
// 只有当producerLimit大于producerIndex时更新才有意义,因此不更新producerLimit。
return false; // FULL :(
}
else
{
// 更新producerLimit为下一个我们必须重新检查消费者索引的值
// 因为是多生产者模式,因此更新缓存会产生竞争。
// Q: 为什么竞争是良性的?
// A: 因为producerLimit永远不会超过下一次的计算值,而producerLimit小于实际值并不会带来错误。
// update producer limit to the next index that we must recheck the consumer index
// this is racy, but the race is benign
soProducerLimit(producerLimit);
}
}
}
while (!casProducerIndex(pIndex, pIndex + 1));
/*
* NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
* the index visibility to poll() we would need to handle the case where the element is not visible.
*/
// CAS 竞争成功,可以进行填充
// 提示:新的生产者索引值先于数组中的元素对其它线程可见。如果依赖于索引的可见性执行poll,我们将需要处理元素可能不可见的情况。
// 前面的CAS已经保证了对象的正确构造(安全发布),这里使用Ordered模式是保证尽快的可见性(volatile是立即的可见性)。
// 计算生产者索引在数组中下标
final long offset = calcCircularRefElementOffset(pIndex, mask);
// 向数组中放入数据
soRefElement(buffer, offset, e);
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
在初始化状态,producerLimit 与队列的容量是相等的,producerLimit = capacity = 4,而 producerIndex = consumerIndex = 0。接下来 Thread1 和 Thread2 并发向 MpscArrayQueue 中存放数据,如下图所示。
两个线程此时拿到的 producerIndex 都是 0,是小于 producerLimit 的。此时两个线程都会尝试使用 CAS 操作更新 producerIndex,其中必然有一个是成功的,另外一个是失败的。
- 假设 Thread-1 执行 CAS 操作成功,那么 Thread-2 失败后就会重新更新 producerIndex。
- Thread-1 更新后 producerIndex 的值为 1,由于 producerIndex 是 volatile 修饰的,更新后立刻对 Thread-2 可见。
- 这里有一点需要注意的是,当前线程更新后的值是被其他线程使用,当 Thread-1 和 Thread-2 都通过 CAS 抢占成功后,它们拿到的 pIndex 分别是 0 和 1。接下来就是根据 pIndex 进行位运算计算得到数组对应的下标,然后通过 UNSAFE.putOrderedObject() 方法将数据写入到数组中,源码如下所示。
public static <E> void soRefElement(E[] buffer, long offset, E e){
UNSAFE.putOrderedObject(buffer, offset, e);
}
2
3
putOrderedObject() 使用了 StoreStore Barrier,对于 Store1,StoreStore,Store2 这样的操作序列,在 Store2 进行写入之前,会保证 Store1 的写操作对其他处理器可见。保证写入都是最新的。
跟进源码 calcCircularRefElementOffset()
/**
* 计算环形数组的指定(逻辑)索引对应的偏移量 - index为逻辑索引,需要转换为真实索引。
* <p>
* 环形数组(环形缓冲区)的空间是重复利用的,因此逻辑上的index需要转换为真正的index然后再计算。
* 为了高效运算,假定了环形数组的长度都为2的整次幂,因此mask应该为数组长度减1,这样可以使用 '&' 快速计算。
* eg. 数组大小为16 index=17 mask=15 index & mask =1
* REF_ARRAY_BASE 数组中第一个元素的偏移地址, REF_ELEMENT_SHIFT:数组中一个元素占用的大小
* Note: circular arrays are assumed a power of 2 in length and the `mask` is (length - 1).
*
* @param index desirable element index
* @param mask (length - 1)
* @return the offset in bytes within the circular array for a given index
*/
public static long calcCircularRefElementOffset(long index, long mask)
{
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 出队Poll
跟进poll()方法。方法作用移除队列的首个元素并返回,如果队列为空则返回 NULL
public E poll()
{
// 直接返回消费者索引 consumerIndex
final long cIndex = lpConsumerIndex();
// 计算数组对应的偏移量
final long offset = calcCircularRefElementOffset(cIndex, mask);
// 读取为本地变量,避免在接下来的volatile读之后重新读取
// Copy field to avoid re-reading after volatile load
final E[] buffer = this.buffer;
// 注意:生产者先更新索引,再填充元素,因此这里必须处理时序问题
// 如果元素不为null,那么可以安全的消费,因为生产者索引一定可见,但是如果元素为null,那么则必须等待其不为null。
// Q: 校验element而不是生产者索引,有什么好处?
// A: 可以减少对生产者索引的读!如果元素可见,那么不必读取生产者索引,可以减少缓存行miss问题。
// If we can't see the next available element we can't poll
E e = lvRefElement(buffer, offset);
if (null == e)
{
// null == e 有以下可能:
// 1. 队列为空
// 2. 生产者已经CAS更新了生产者索引,但是尚未填充元素,或填充的元素尚不可见 - 此时需要等待生产者完成填充,因为队列的状态表示当前并不为空!
// 提示:如果生产者在CAS更新生产者索引之后填充元素之前被中断,在这种情况下,队列并不是真正的为空。其它生产者会在该元素之后继续填充队列。
/*
* NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
* winning the CAS on offer but before storing the element in the queue. Other producers may go on
* to fill up the queue after this element.
*/
if (cIndex != lvProducerIndex())
{
// 队列不为空,需要自旋等待直到元素可见 - 这也是比relaxedPool开销大的原因
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
}
else
{
// 消费者索引和生产者索引相同,证明队列确实为空
return null;
}
}
// 先消费元素,再更新消费者进度(因为生产者会先校验consumerIndex,因此可确保生产者不会覆盖数据)。
// 这里可以使用Plain模式赋值为null,因为生产者一定会在索引可见之后才填充元素,consumerIndex的发布可以保证这里也正确发布。
spRefElement(buffer, offset, null);
soConsumerIndex(cIndex + 1);
return e;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
因为只有一个消费者线程,所以整个 poll() 的过程没有 CAS 操作。poll() 方法核心思路:
- 是获取消费者索引 consumerIndex
- 根据 consumerIndex 计算得出数组对应的偏移量
- 将数组对应位置的元素取出并返回
- 队列为null,直接返回
- 队列不为空,自旋直到获取到元素
- 将 consumerIndex 移动到环形数组下一个位置。
获取消费者索引以及计算数组对应的偏移量的逻辑与 offer() 类似,在这里就不赘述了。下面直接看下如何取出数组中 offset 对应的元素,跟进 lvElement() 方法的源码。
public static <E> E lvRefElement(E[] buffer, long offset)
{
return (E) UNSAFE.getObjectVolatile(buffer, offset);
}
2
3
4
获取数组元素的时候同样使用了 UNSAFE 系列方法,getObjectVolatile() 方法则使用的是 LoadLoad Barrier,对于 Load1,LoadLoad,Load2 操作序列,在 Load2 以及后续读取操作之前,会保证 Load1 的读取操作执行完毕,所以 getObjectVolatile() 方法可以保证每次读取数据都可以从内存中拿到最新值。
与 offer() 相反,poll() 比较关注队列为空的情况。当调用 lvElement() 方法获取到的元素为 NULL 时,有两种可能的情况:
- 如果消费者索引 consumerIndex 等于生产者 producerIndex,说明队列为空。
- 只要两者不相等,消费者需要等待生产者填充数据完毕。(在 offer() 中先更新索引,然后填充数据)
# 总结
对 MpscArrayQueue 的知识点做一个简单的总结。
- 通过大量填充 long 类型变量解决伪共享问题。
- 环形数组的容量设置为 2 的次幂,可以通过位运算快速定位到数组对应下标。
- 入队 offer() 操作中 producerLimit 的巧妙设计,大幅度减少了主动获取消费者索引 consumerIndex 的次数,性能提升显著。
- 入队和出队操作中都大量使用了 UNSAFE 系列方法,针对生产者和消费者的场景不同,使用的 UNSAFE 方法也是不一样的。
# 说明
相关测试实践已经上传github repo (opens new window)