LiFengMing LiFengMing
首页
云原生
中间件
工具导航
资源导航
  • 分类
  • 标签
  • 归档
关于作者
GitHub (opens new window)

LiFengMing

IT届李哥
首页
云原生
中间件
工具导航
资源导航
  • 分类
  • 标签
  • 归档
关于作者
GitHub (opens new window)
  • 编程语言

    • Go

    • Java

      • 高性能无锁队列简介
      • 漫谈内存伪共享
      • 漫谈JVM内存屏障
      • 漫谈JDK原生并发队列
      • 神奇的高性能无锁队列Disruptor
      • 神奇的高性能无锁队列JCTools
        • 前言
        • 什么是JCTools
          • 非阻塞 Map
          • 非阻塞 Queue
          • 基于实现数据结构分类
          • 源码解析
          • 入队offer()
          • 出队Poll
          • 总结
          • 说明
        • 参考文档
      • Linux内核之kfifo
  • 问题排查手册

  • 容器编排技术

  • 云原生
  • 编程语言
  • Java
LiFengMing
2020-09-20
目录

神奇的高性能无锁队列JCTools

# 前言

最近在看Netty源码的时候,关注到了JCTools。JCTools是什么?是一个高性能的无锁并发工具包。早在96年就有论文提出了无锁队列的概念,再到后来 Disruptor,高性能已得到生产的验证。此处介绍的 Jctools 中的高性能队列,其性能丝毫不输于 Disruptor。那Disruptor与JCTools都是高性能队列的处理有什么区别了?

# 什么是JCTools

JCTools (Java Concurrency Tools) 提供了一系列非阻塞并发数据结构(标准 Java 中缺失的),当存在线程争抢的时候,非阻塞并发数据结构比阻塞并发数据结构能提供更好的性能。

CTools 是适用于 JVM 并发开发的工具,主要提供了一些 JDK 确实的并发数据结构,例如非阻塞 Map、非阻塞 Queue 等。其中非阻塞队列可以分为四种类型,可以根据不同的场景选择使用。

JCTools 是一个开源工具包,在 Apache License 2.0 下发布,并在 Netty、Rxjava 等诸多框架中被广泛使用。

# 非阻塞 Map

  • ConcurrentAutoTable
    • 后面几个map/set结构的基础
  • NonBlockingHashMap :
    • 是对 ConcurrentHashMap 的增强,对多 CPU 的支持以及高并发更新提供更好的性能。
  • NonBlockingHashMapLong
    • 是 key 为 Long 型的 NonBlockingHashMap。
  • NonBlockingHashSet
    • 是对 NonBlockingHashMap 的简单包装以支持 set 的接口。
  • NonBlockingIdentityHashMap
    • 是从 NonBlockingHashMap 改造来的,使用 System.identityHashCode() 来计算哈希。
  • NonBlockingSetInt
    • 是一个使用 CAS 的简单的 bit-vector。

# 非阻塞 Queue

JCTools 提供的非阻塞队列分为 四种类型,可以根据不同的应用场景选择使用:

  • Spsc:单生产者单消费者(有界和无界)
  • Mpsc:多生产者单消费者(有界和无界)
  • Spmc:单生产者多消费者(有界)
  • Mpmc:多生产者多消费者(有界)

“生产者”和“消费者”是指“生产线程”和“消费线程”。

# 基于实现数据结构分类

  • 基于数组的队列:
    • MpscArrayQueue
    • MpmcArrayQueue
    • SpscArrayQueue
    • SpmcArrayQueue
  • 基于链表的队列:
    • MpscLinkedQueue
    • SpscLinkedQueue
  • 基于LinkedArray的队列
    • SpscChunkedArrayQueue
    • SpscGrowableArrayQueue
    • SpscUnboundedArrayQueue
    • MpscGrowableArrayQueue
    • MpscChunkedArrayQueue
    • MpscUnboundedArrayQueue(Netty中使用)
  • XaddQueue:
    • MpscUnboundedXaddArrayQueue
    • MpmcUnboundedXaddArrayQueue
  • 其它类型队列:
    • MpscCompoundQueue

# 源码解析

首先我们先从简单的 MpscArrayQueue 队列的源码开始分析:

# 入队offer()

跟进源码之前,首先回顾下 MpscArrayQueue 的重要属性

// ConcurrentCircularArrayQueue
   /**
    * Q:这个值有什么作用?
    * A:数组长度对应的掩码 - 方便用&运算代替较慢的%运算 子类需要继续缓存行填充,以避免mask 和 buffer 产生伪共享
    * eg.        capacity=16     0000 0000 0001 0000
    *      mask: capacity-1=15   0000 0000 0000 1111
    *  h & (length-1) = h % length
    */
protected final long mask; 

// 真正存放队列数据的数组
protected final E[] buffer; 


// MpmcArrayQueueProducerIndexField
private volatile long producerIndex; // 生产者的索引

// MpscArrayQueueProducerLimitField
    /**
     * 在重新读取消费者索引之前,第一个不可用的生产者索引。
     * <p>
     * Q: 这个值有什么用,直接读取consumerIndex计算不行吗?
     * A: {@code consumerIndex}是一个变化较为频繁的值,因此它所在的缓存行极易失效,从而影响读性能。
     * 我们拷贝一个副本(并在副本无效的时候更新),这样可以减少生产者与消费者之间产生的伪共享,从而提高读效率.
     * <p>
     * Q: 该值为什么进行缓存行填充,为什么与producerIndex分离?
     * A: 因为是多生产模式,因此producerIndex上将产生高度竞争,因此其所在的缓存行极易失效,
     * 将该值与producerIndex分开,我们期望该值大部分时间位于用于共享(且很少失效)的缓存行中。
     * PS: 该值的更新频率远低于producerIndex。
     */
private volatile long producerLimit; 

// MpscArrayQueueConsumerIndexField
    /**
     * 消费者索引(当前消费进度).
     * 这是一个滞后值,消费者先消费可用槽位数据,再更新消费进度;
     */
protected long consumerIndex; 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

跟进 offer() 方法的源码:

 public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

       //producerLimit基于consumerIndex计算的一个缓存值,用户减少对consumerIndex的读取(减少缓存行miss),在循环中可能更新
        final long mask = this.mask;
        long producerLimit = lvProducerLimit();
        long pIndex;
        do
        {
           //获取生产者索引
            pIndex = lvProducerIndex();
            if (pIndex >= producerLimit)
            {
                // 生产者索引大于等于缓存的上限,表示根据缓存值认为队列已满。
                // 此时,分两种情况:1. 队列真的满了。 2.缓存过期了。
                // 因此需要读取最新的消费者索引,计算新的上限,判断队列是否是真的满了(以满足Queue对offer的语义要求)
                
                // 获取消费者索引
                final long cIndex = lvConsumerIndex();
                //设计的很巧妙,仔细分析下。就是生产速度最大快于消费者一个容量的大小
                //buffer 的大小=mask+1
                producerLimit = cIndex + mask + 1;
                
                //生产速度是否快于消费速度
                if (pIndex >= producerLimit)
                {
                    // 最新的消费者索引显式队列确实已满
                    // 只有当producerLimit大于producerIndex时更新才有意义,因此不更新producerLimit。
                    return false; // FULL :(
                }
                else
                {
                    // 更新producerLimit为下一个我们必须重新检查消费者索引的值
                    // 因为是多生产者模式,因此更新缓存会产生竞争。
                    // Q: 为什么竞争是良性的?
                    // A: 因为producerLimit永远不会超过下一次的计算值,而producerLimit小于实际值并不会带来错误。

                    // update producer limit to the next index that we must recheck the consumer index
                    // this is racy, but the race is benign
                    soProducerLimit(producerLimit);
                }
            }
        }
        while (!casProducerIndex(pIndex, pIndex + 1));
        /*
         * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
         * the index visibility to poll() we would need to handle the case where the element is not visible.
         */
        // CAS 竞争成功,可以进行填充
        // 提示:新的生产者索引值先于数组中的元素对其它线程可见。如果依赖于索引的可见性执行poll,我们将需要处理元素可能不可见的情况。

        // 前面的CAS已经保证了对象的正确构造(安全发布),这里使用Ordered模式是保证尽快的可见性(volatile是立即的可见性)。
        // 计算生产者索引在数组中下标
        final long offset = calcCircularRefElementOffset(pIndex, mask);
        // 向数组中放入数据
        soRefElement(buffer, offset, e);
        return true; 
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

在初始化状态,producerLimit 与队列的容量是相等的,producerLimit = capacity = 4,而 producerIndex = consumerIndex = 0。接下来 Thread1 和 Thread2 并发向 MpscArrayQueue 中存放数据,如下图所示。

image-20210826200206920

两个线程此时拿到的 producerIndex 都是 0,是小于 producerLimit 的。此时两个线程都会尝试使用 CAS 操作更新 producerIndex,其中必然有一个是成功的,另外一个是失败的。

  1. 假设 Thread-1 执行 CAS 操作成功,那么 Thread-2 失败后就会重新更新 producerIndex。
  2. Thread-1 更新后 producerIndex 的值为 1,由于 producerIndex 是 volatile 修饰的,更新后立刻对 Thread-2 可见。
  3. 这里有一点需要注意的是,当前线程更新后的值是被其他线程使用,当 Thread-1 和 Thread-2 都通过 CAS 抢占成功后,它们拿到的 pIndex 分别是 0 和 1。接下来就是根据 pIndex 进行位运算计算得到数组对应的下标,然后通过 UNSAFE.putOrderedObject() 方法将数据写入到数组中,源码如下所示。
    public static <E> void soRefElement(E[] buffer, long offset, E e){
        UNSAFE.putOrderedObject(buffer, offset, e);
    }
1
2
3

putOrderedObject() 使用了 StoreStore Barrier,对于 Store1,StoreStore,Store2 这样的操作序列,在 Store2 进行写入之前,会保证 Store1 的写操作对其他处理器可见。保证写入都是最新的。

跟进源码 calcCircularRefElementOffset()

   /**
     * 计算环形数组的指定(逻辑)索引对应的偏移量 - index为逻辑索引,需要转换为真实索引。
     * <p>
     * 环形数组(环形缓冲区)的空间是重复利用的,因此逻辑上的index需要转换为真正的index然后再计算。
     * 为了高效运算,假定了环形数组的长度都为2的整次幂,因此mask应该为数组长度减1,这样可以使用 '&' 快速计算。
     * eg. 数组大小为16 index=17 mask=15 index & mask =1 
     * REF_ARRAY_BASE 数组中第一个元素的偏移地址, REF_ELEMENT_SHIFT:数组中一个元素占用的大小
     * Note: circular arrays are assumed a power of 2 in length and the `mask` is (length - 1).
     *
     * @param index desirable element index
     * @param mask (length - 1)
     * @return the offset in bytes within the circular array for a given index
     */
public static long calcCircularRefElementOffset(long index, long mask)
    {
        return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 出队Poll

跟进poll()方法。方法作用移除队列的首个元素并返回,如果队列为空则返回 NULL

public E poll()
    {
        // 直接返回消费者索引 consumerIndex
        final long cIndex = lpConsumerIndex();
        // 计算数组对应的偏移量
        final long offset = calcCircularRefElementOffset(cIndex, mask);

        // 读取为本地变量,避免在接下来的volatile读之后重新读取
        // Copy field to avoid re-reading after volatile load
        final E[] buffer = this.buffer;

        // 注意:生产者先更新索引,再填充元素,因此这里必须处理时序问题
        // 如果元素不为null,那么可以安全的消费,因为生产者索引一定可见,但是如果元素为null,那么则必须等待其不为null。
        // Q: 校验element而不是生产者索引,有什么好处?
        // A: 可以减少对生产者索引的读!如果元素可见,那么不必读取生产者索引,可以减少缓存行miss问题。

        // If we can't see the next available element we can't poll
        E e = lvRefElement(buffer, offset);
        if (null == e)
        {
            // null == e 有以下可能:
            // 1. 队列为空
            // 2. 生产者已经CAS更新了生产者索引,但是尚未填充元素,或填充的元素尚不可见 - 此时需要等待生产者完成填充,因为队列的状态表示当前并不为空!

            // 提示:如果生产者在CAS更新生产者索引之后填充元素之前被中断,在这种情况下,队列并不是真正的为空。其它生产者会在该元素之后继续填充队列。

            /*
             * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
             * winning the CAS on offer but before storing the element in the queue. Other producers may go on
             * to fill up the queue after this element.
             */
            if (cIndex != lvProducerIndex())
            {
                // 队列不为空,需要自旋等待直到元素可见 - 这也是比relaxedPool开销大的原因
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                // 消费者索引和生产者索引相同,证明队列确实为空
                return null;
            }
        }

        // 先消费元素,再更新消费者进度(因为生产者会先校验consumerIndex,因此可确保生产者不会覆盖数据)。
        // 这里可以使用Plain模式赋值为null,因为生产者一定会在索引可见之后才填充元素,consumerIndex的发布可以保证这里也正确发布。
        spRefElement(buffer, offset, null);
        soConsumerIndex(cIndex + 1);
        return e;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

因为只有一个消费者线程,所以整个 poll() 的过程没有 CAS 操作。poll() 方法核心思路:

  1. 是获取消费者索引 consumerIndex
  2. 根据 consumerIndex 计算得出数组对应的偏移量
  3. 将数组对应位置的元素取出并返回
    1. 队列为null,直接返回
    2. 队列不为空,自旋直到获取到元素
  4. 将 consumerIndex 移动到环形数组下一个位置。

获取消费者索引以及计算数组对应的偏移量的逻辑与 offer() 类似,在这里就不赘述了。下面直接看下如何取出数组中 offset 对应的元素,跟进 lvElement() 方法的源码。

 public static <E> E lvRefElement(E[] buffer, long offset)
    {
        return (E) UNSAFE.getObjectVolatile(buffer, offset);
    }
1
2
3
4

获取数组元素的时候同样使用了 UNSAFE 系列方法,getObjectVolatile() 方法则使用的是 LoadLoad Barrier,对于 Load1,LoadLoad,Load2 操作序列,在 Load2 以及后续读取操作之前,会保证 Load1 的读取操作执行完毕,所以 getObjectVolatile() 方法可以保证每次读取数据都可以从内存中拿到最新值。

与 offer() 相反,poll() 比较关注队列为空的情况。当调用 lvElement() 方法获取到的元素为 NULL 时,有两种可能的情况:

  1. 如果消费者索引 consumerIndex 等于生产者 producerIndex,说明队列为空。
  2. 只要两者不相等,消费者需要等待生产者填充数据完毕。(在 offer() 中先更新索引,然后填充数据)

# 总结

对 MpscArrayQueue 的知识点做一个简单的总结。

  • 通过大量填充 long 类型变量解决伪共享问题。
  • 环形数组的容量设置为 2 的次幂,可以通过位运算快速定位到数组对应下标。
  • 入队 offer() 操作中 producerLimit 的巧妙设计,大幅度减少了主动获取消费者索引 consumerIndex 的次数,性能提升显著。
  • 入队和出队操作中都大量使用了 UNSAFE 系列方法,针对生产者和消费者的场景不同,使用的 UNSAFE 方法也是不一样的。

# 说明

相关测试实践已经上传github repo (opens new window)

# 参考文档

  1. Github wiki (opens new window)
  2. Java魔法类:Unsafe应用解析 (opens new window)
  3. 内存屏障 (opens new window)
  4. Github repo (opens new window)
编辑 (opens new window)
#Queue
上次更新: 2025/01/19, 23:15:59
神奇的高性能无锁队列Disruptor
Linux内核之kfifo

← 神奇的高性能无锁队列Disruptor Linux内核之kfifo→

最近更新
01
云原生资源
05-25
02
快速搭建Spring项目
03-27
03
kafka版本迭代说明
03-11
更多文章>
Theme by Vdoing | Copyright © 2018-2025 LiFengMing | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式