神奇的高性能无锁队列Disruptor
# Disruptor为什么那么快?
- 预分配 ringbuffer
- 无锁(CAS)以及减小锁竞争
- 缓存行填充,消除伪共享
- 位运算的使用
# 生产者/消费者模型
# 核心类关系
由上图可以看出,所有的操作都是对 RingBuffer 进行的操作。
# 核心概念
- Ring Buffer(环形缓冲区): 环形缓冲区通常被认为是Disruptor的主要方面,但从3.0开始,环形缓冲区仅负责存储和更新通过Disruptor的数据(事件)。对于一些高级用例,可以完全由用户替换。
- Sequence: Disruptor使用Sequences作为识别特定组件所在位置的方法。每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence。大多数并发代码依赖于这些Sequence值的移动,因此Sequence支持AtomicLong的许多当前功能。事实上,两者之间唯一真正的区别是序列包含额外的功能,以防止序列和其他值之间的错误共享。
- Sequencer: Sequencer是Disruptor的真正核心。该接口的两个实现(单生成器,多生产者)实现了所有并发算法,用于在生产者和消费者之间快速,正确地传递数据。
- Sequence Barrier: 序列屏障由序列发生器产生,包含对序列发生器中主要发布的序列和任何依赖性消费者的序列的引用。它包含确定是否有任何可供消费者处理的事件的逻辑。
- Wait Strategy: 等待策略确定消费者如何等待生产者将事件放入Disruptor。有关可选锁定的部分中提供了更多详细信息。
- Event: 从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。
- EventProcessor: 用于处理来自Disruptor的事件的主事件循环,并具有消费者序列的所有权。有一个名为 BatchEventProcessor的表示,它包含事件循环的有效实现,并将回调到使用的提供的EventHandler接口实现。
- EventHandler: 由用户实现并代表Disruptor的使用者的接口。
- Producer: 这是调用Disruptor以将事件排入队列的用户代码。这个概念在代码中也没有表示。
# 生产者写入数据
写入数据的步骤包括:
1.占位
2.移动游标并填充数据
3.发布事件
需要考虑的问题:
1.如何避免生产者的生产速度过快而造成的新消息覆盖了未被消费的旧消息的问题
答:生产者再获取占位之前需要查看当前最慢的消费者位置,如果当前要发布的位置比消费者大,就等待;
2.如何解决多个生产者抢占生产位的问题
答:多个生产者通过CAS获取生产位;
# 消费者消费数据
1.一个消费者一个线程;
2.每个消费者都有一个游标表示已经消费到哪了(Sequence);
3.消息者会等待(waitFor)新数据,直到生产者通知(signal);
需要考虑的问题:
如何防止读取的时候,读到还未写的元素?
答:WaitStrategy(等待策略)
# 走进源码
# 生产者
笔者做实践 Disruptor 的时候,对生产者做了一次封装。这里可以通过 add()方法熟悉 disruptor的写入过程。
public class DisruptorQueue<T> {
private Disruptor<ObjectEvent<T>> disruptor;
private RingBuffer<ObjectEvent<T>> ringBuffer;
public DisruptorQueue(Disruptor<ObjectEvent<T>> disruptor) {
this.disruptor = disruptor;
this.ringBuffer = disruptor.getRingBuffer();
this.disruptor.start();
}
public void add(T t) {
if (t != null) {
//1.获取ringbuffer下一个可写的序列
long sequence = this.ringBuffer.next();
try {
//2.获取对象事件,并进行数据填充
ObjectEvent<T> event = this.ringBuffer.get(sequence);
event.setObj(t);
} finally {
//3.发布事件
this.ringBuffer.publish(sequence);
}
}
}
public void addAll(List<T> ts) {
if (ts != null) {
for (T t : ts) {
if (t != null) {
this.add(t);
}
}
}
}
//省略。。。
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
从笔者封装可以看出 Disruptor 写入主要干了三件事:
- 获取写一个可用的序列 sequence
- 通过 sequence 定位,ringBuffer 中的槽位。并填充槽位
- 发布事件
基于上面三个步骤我们分别核心代码跟进
1.跟进代码:com.lmax.disruptor.SingleProducerSequencer#next(int)
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
//在一个写入序列
long nextSequence = nextValue + n;
//后退一圈,仔细分析 wrapPoint就是consumer的最小消费位点。
long wrapPoint = nextSequence - bufferSize;
// 获取上一次的最小消费位置
long cachedGatingSequence = this.cachedValue;
//生产者速度快于消费者且超生产一个rangbuffer大小,则没有剩余空间
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
//StoreLoad 更新游标
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
//重新计算所有消费者里面的最小值位置
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
//防止死循环,让出CPu 暂停1纳秒,继续下一次计算
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
//下一个序列
this.nextValue = nextSequence;
return nextSequence;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2.跟进代码:com.lmax.disruptor.RingBufferFields#elementAt
protected final E elementAt(long sequence)
{ //通过 UNSAFE 方法获取指定序列的元素
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
2
3
4
3.跟进代码:
public void publish(long sequence)
{
//更新游标
cursor.set(sequence);
//如果阻塞则唤醒
waitStrategy.signalAllWhenBlocking();
}
2
3
4
5
6
7
由于有些策略是阻塞性的,必须需要手动唤醒才能进行消费。具体策略感兴趣的朋友可以去深扒,代码不是很难。
# 消费者
从封装的 DisruptorQueue 代码中可以看出,构造方法中 this.disruptor.start(); 方法。就是开启消费者线程。
跟进 com.lmax.disruptor.dsl.Disruptor#start
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
// 开启消费线程,如果多个就开启多个线程消费
consumerInfo.start(executor);
}
return ringBuffer;
}
2
3
4
5
6
7
8
9
10
11
继续跟进源代码到:com.lmax.disruptor.BatchEventProcessor#run
public void run()
{
if (running.compareAndSet(IDLE, RUNNING))
{
sequenceBarrier.clearAlert();
notifyStart();
try
{
if (running.get() == RUNNING)
{
//真正处理事件的地方
processEvents();
}
}
finally
{
notifyShutdown();
running.set(IDLE);
}
}
else
{
// This is a little bit of guess work. The running state could of changed to HALTED by
// this point. However, Java does not have compareAndExchange which is the only way
// to get it exactly correct.
if (running.get() == RUNNING)
{
throw new IllegalStateException("Thread is already running");
}
else
{
earlyExit();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
核心代码:com.lmax.disruptor.BatchEventProcessor#processEvents
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
//获取可用的序列,不同的等待策略有不同的效果
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
//事件消费,直到ringbuffer队列为null
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
//用户实现消费的入口
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
到这里对于Disruptor的生产流程,与消费流程就分析完了。有没有醍醐灌顶的感觉。。。
# 使用场景
- Log4j2
# 说明
笔者相关实践代码已经上传github repo (opens new window)