之前 CPU Cache 基础 和 内存屏障及其在 JVM 下的应用 这个文章介绍了很多 CPU Cache、缓存一致性上面的东西,理论多实践少,而 Disruptor 作为一个被广泛使用的高性能内存队列,它最主要的就是在内存、缓存、减小 CPU 竞争方面做文章来提高性能,所以实现时有大量的跟这些主题相关的优化,刚好能作为实践示例供我们学习参考,看这些理论的东西是怎么与实际结合起来的。
实现高性能队列要考虑的问题
队列要有界还是无界
即队列要不要设置一个最大长度限制。如果队列可以无限增长,则容易带来一些不稳定因素。例如请求延迟时间可能会有长尾现象,即有的请求处理延迟非常的高;再例如入队对象因为长时间未被消费处理而被晋升到老代,增多老年代的 GC 次数;再例如队列堆积请求过多导致内存被占满而 OOM 等。
一般追求稳定和低延迟的话会倾向于使用有界队列。
要不要使用锁
锁的开销主要因为线程需要做 Context Switch 陷入内核态后被挂起,等待锁被释放。锁释放后又要经历 Context Switch 来将线程唤醒。每次 Context Switch 都会导致大量 Cache 失效,CPU 内执行的指令被打断,影响性能。以 ArrayBlockingQueue 为例,入队出队都抢同一把锁,Producer、Consumer 会不断竞争,并发性能不高。
一般来说有个很好的减小竞争的办法就是减小锁粒度,以 LinkedBlockingQueue 为例,入队出队由两把锁分别控制,最优情况下 Producer 只抢入队的锁,Consumer 只抢出队的锁,这样至少 Producer 和 Consumer 之间不会产生竞争,减少了竞争也就能提升并发性能。但实际使用中,为了能达到最优性能,Producer 和 Consumer 得完美配合,让队列持续工作在既不空,也不满的状态,才能达到最优性能。而事实上队列绝大多数时候要么工作在空的状态,要么工作在满的状态,很少能维持在不空又不满的状态。而工作在空或者满的状态下,Producer 为了通知 Consumer 有新数据到来除了要抢入队锁外还得抢出队的锁,同理 Consumer 为了通知 Producer 有新的空位除了要抢出队的锁外还要抢入队的锁。所以实际上想通过减小锁粒度来提升并发性能并不容易。从实际效果上来说,LinkedBlockingQueue 并发性能并不见得比 ArrayBlockingQueue 好。
要不要使用 CAS 做乐观锁
如果能不使用锁就能完全避免锁的开销,一般来说能极大的提高并发性能。CAS 操作经常被用来实现各种无锁的数据结构,队列也有专门的无锁队列实现算法。例如 ConcurrentLinkedQueue 就是基于 CAS 的无锁队列(但它是无界的无锁队列,有界的无锁队列实现会更加困难)。
CAS 操作虽然比锁开销小很多但总归会有些开销。CAS 操作开销一般来自这么几个方面:
- CAS 操作一般会有内存屏障去保证 CAS 的写入操作能立即被别的线程看见,而内存屏障是有开销的
- CAS 操作为了保证原子性,它会影响 CPU 执行指令的效率。
- 从使用角度来说,CAS 经常被用来当做乐观锁来使用,当竞争小的时候 CAS 操作性能很好,但当竞争特别激烈时乐观锁的重试操作开销甚至能超过普通悲观锁加锁的开销。
CAS 乐观锁去实现的队列除了需要考虑 CAS 的开销外,还要考虑:
- 只使用 CAS 操作的无锁算法一般都比较难实现,难度大于使用锁的实现,正确性也就不容易保证
- 用 CAS 乐观锁实现无界队列的算法比较成熟和常见,实现有界队列会更困难
内存屏障的使用
无锁队列除了用 CAS 能实现外,在一定使用场景下甚至能仅靠内存屏障来实现无锁队列。Disruptor 实际就能做到这一点,后续会详细说明。只使用内存屏障的话性能要在 CAS 操作实现的队列之上。
但使用内存屏障时也要注意,内存屏障使用的多了会导致性能下降,使用的少了可能导致延迟升高,甚至对算法正确性有影响。例如 Producer 明明写了数据但 Consumer 没看见等。
GC 的开销
一般使用队列时,入队的元素都是从 Heap 上 new 出来的新对象,出队后做完处理就被丢弃。当吞吐量比较大的时候大量创建又消亡的对象会给 GC 带去压力。如果对象消费的比较快,一般影响会局限在年轻代上,但如果队列积累的数据比较多,处理的慢,队列内的对象还有晋升到老年代的可能,给 GC 带去的影响就更大了。
需要考虑对 Cache Line 的利用
例如如果是类似访问数组这种场景,访问的数据有比较清晰的模式,CPU 在从内存获取数组数据时就可能一次性多取一些数据,从而能在遍历数组时尽力走 CPU Cache。
高性能队列实现时如果能照顾到 Cache Line 机制,利用好 CPU Cache,能让性能更进一步。
要考虑到队列主要工作在空或满的状态
这个前面提过了,就是队列绝大多数时候都工作在空或者满的状态,既不空又不满的一般都是短暂的中间状态。队列这种特殊性就导致即使入队出队使用不同的锁或者 CAS 变量,Producer 和 Consumer 之间也会引入很多竞争。除了锁或者 CAS 变量的竞争之外,再一个对性能影响比较大的就是 Cache Coherence 的影响。例如 Producer 刚入队一个元素,操作了队首对象,紧接着 Consumer 线程就去消费这个对象,也要操作 Producer 刚刚操作过的指针或者对象,不同线程修改同一块内存就容易引入 Cache 同步的开销。即同一个 Cache Line 需要在两个 CPU 之间来回搬运。
要考虑到如何组成级联结构
因为处理数据时候经常是需要将数据处理过程实现为 Pipeline,Pipeline 内分为多个 Stage,每个 Stage 做一个单一的事情,每个 Stage 可能都由独立的线程去完成。
如果真要支持这种级联结构,只是简单的将多个队列串起来,这种 Producer、Consumer 之间竞争的开销就会被成倍的放大。数据每出入一个 Stage 就是重新出入队一次,带来一堆开销。
Disruptor 设计要点
主要内容来自 https://lmax-exchange.github.io/disruptor/disruptor.html
Disruptor 就是要解决上面这些问题,在各种矛盾之间找一个平衡点。
基本概念
为了说清楚设计要点得先介绍几个概念。
- Ring Buffer。Disruptor 采用环形数组存放放入队列的 Element,这个环形数组就称为 Ring Buffer。
- Sequence。可以理解为 Ring Buffer 上的 Index。Ring Buffer 都得通过 Sequence 来进行访问。Producer 要 Produce 数据时首先得计算出来该在哪个 Sequence 存放要入队的数据,Consumer 要消费数据时也得先计算出来待消费的 Sequence,再从 Ring Buffer 上找到这个 Sequence 对应的元素做消费;
Ring Buffer
Disruptor 采用环形数组称为 Ring Buffer 来存放放入队列的 Element,在 Disruptor 内也称为 Event。这一点和 ArrayBlockingQueue 一样。使用环形数组的原因是:
- 天然有界,方便实现有界队列。前面提到无界队列的坏处,Disruptor 一大设计要点是低延迟,无界队列可能导致的 GC 和队列爆炸都可能让延迟大幅度增加,降低稳定性,所以 Disruptor 选用了有界队列;
- 有更好的 Cache Line 利用率。以 Consumer 为例,每次从数组读取数据消费有清晰的模式,读完一个数据接下来就会读下一个,两次读取的数据大概率在同一个 Cache Line 上,所以数据访问有更好的本地性,能更好的利用 Cache
我们知道 Java 上创建数组后只是在内存上创建出了对象的引用,实际每个引用都指向空地址:
// 定义一个 Element 类
public class Element {
public Object data;
}
// 只创建出能容纳 10 个 Element Object 引用的数组,初始情况下每个引用都指向空地址
Element[] array = new Element[10];
需要主动的将引用指向一个对象,才能完成数组和对象的绑定,即相当于把一个对象「放入」了数组中。
// 假设我们有个对象要放入数组,入队对象的 data 值假设就为 enqueueData
Element A = new Element();
A.data = enqueueData;
// 需要主动将数组内的引用和待放入数组对象的引用
array[0] = A;
如果我们把上图的数组想象成队列,Element A 就是入队的元素,出队时就是把 array[0]
重新指向 null
将 Element A 移走,之后 Element A 会被拿去做处理,最终被 GC。可以看到这个实现下,入队时要 new Element()
,出队后 Element 就被 GC。虽然一般这种对象生命周期很短,Young GC 就能处理掉但毕竟会增加 GC 的负担。
除此之外,上图里如果又入队了 Element B,变成了下面图的样子 :
Element B = new Element();
B.data = enqueueData;
array[1] = B;
此时 Element A 和 B 都是存活在堆内存上, 但 A B 俩对象在内存上并不一定并排着挨在一起,很大可能他们不在会在同一个 Cache Line 上。所以虽然用了 Array 来实现环形数组但依然无法提高 Cache 利用率。所以 Disruptor 为了减少 GC 和提高 Cache 利用率,它在初始化的时候除了创建数组引用之外,还将数组引用的未来会入队出队的元素也一口气全部创建出来,如下图:
因为 TLAB 的关系,初始化时同一个线程连续创建出来的 A B C D E F 对象有很大的可能是紧密连续分配在内存上的,所以能提高访问数据的本地性,从而提高 Cache 利用率。
因为入队出队的 Element 已经全部分配好,所以 Disruptor 上入队的元素不是修改 Ring Buffer 的引用指向新入队对象,而是修改引用指向的对象来完成入队。
// 例如我们有个环形数组
Element[] ringBuffer = new Element[10];
// Disruptor 会在初始化时候就为所有引用分配入队对象
for (int i = 0; i <= ringBuffer.length; i++) {
ringBuffer[i] = new Element();
}
// 例如是 0 号位需要入队,就直接取 0 号位的 Element 对象来修改
ringBuffer[0].data = enqueueData
这种方式下如果 Element 内成员变量全是基本类型的话,入队出队甚至能做到完全不用新分配内存。
拆分竞争
这个是 Disruptor 性能好,竞争开销低的核心原因,简单说就是将队列这个模型进行拆解,拆解成数据存储、Producer 和 Consumer 三个部分。从 Producer 入队数据就能看出来这三部分是如何交互的。生产数据会分为三步:
// 1. 首先要取一个可用的 Sequence 去存储入队数据
long sequence = ringBuffer.next();
try {
// 2. 取到 Sequence 后访问 Ring Buffer 获取待修改的
// Element 对象,对 Element 做修改完成入队
Element event = ringBuffer.get(sequence);
event.setData(enqueueData);
} finally {
// 3. 将之前取到的 Sequence publish,让 Consumer 能看到
ringBuffer.publish(sequence);
}
数据存储部分作为 Producer 和 Consumer 都会竞争访问的共享部分,它的访问不做任何线程安全的保护,Producer 和 Consumer 线程都能无锁的访问。在 Producer 一侧如果有多个线程都有元素要入队,需要通过 Producer Barrier 做竞争,每个 Producer 线程会分配一个 Sequence,分配好 Sequence 后 Producer 线程带着 Sequence 去 Ring Buffer 写数据。Consumer 一侧只有一个线程,它通过监听 Producer Publish 的 Sequence 号来决定该消费到哪个 Sequence。
这里拆分竞争,如果拿 ArrayBlockingQueue 的实现做比,就好似将数据存储的数组,和 Producer 要竞争的头结点,Consumer 要竞争的尾结点解耦。Producer 线程只会通过 Producer Barrier 竞争下一个 Produce 数据的 Sequence,竞争 Sequence 的过程不用操作 Ring Buffer,竞争完有了 Sequence 后,因为每个 Producer 只会获取到唯一的 Sequence,所以 Producer 线程不需要任何线程同步机制就能去 Ring Buffer 入队数据,数据入队后只是通知 Consumer 当前 Produce 的 Sequence 是什么就完成全部入队操作。Producer 只会在 Producer 线程之间通过 Producer Barrier 竞争可用的 Sequence。Producer 和 Consumer 之间没有任何竞争。Producer 竞争完 Sequence 后去 Ring Buffer 完成实际入队的过程也没有任何竞争。
可能会看的晕,但看完后面内容回来再看这里应该会觉得清晰很多。
同样,Consumer 只有一个线程,要消费哪个元素也是由维护在自己内部的 Sequence 决定,这个 Sequence 只有 Consumer 自己会操作,Producer 无法操作。Consumer 只是在消费完数据后将更新后的 Sequence 告知给 Producer。于是 Consumer 就完全避免了和 Producer 竞争去操作某个共享变量的开销。
反观其它队列的实现,都是将竞争入队位置,执行入队操作,通知 Consumer 入队完成这三个事情绑定在一起。因为数据存储部分 Producer 和 Consumer 都会操作,没有特殊规则约束则无法避免 Producer 和 Consumer 线程之间的竞争,而 Disruptor 在做了精细拆分后,读写 Ring Buffer 的操作数据存储这一步就能完全的变成 Producer 线程或 Consumer 线程的本地操作,没有竞争也不需要任何同步机制去保证正确性。
那 Disruptor 这里如何保证 Producer 和 Consumer 之间不会访问相同的 Sequence 呢?
Disruptor 的 Sequence 是个只增不减的 Long 型整数。Producer 或 Consumer 在访问 Ring Buffer 时候都会对 Sequence 进行取模,找到这个 Sequence 在 Ring Buffer 上实际对应的 Index 进行操作。
对 Consumer 来说,它会读取 Producer 最近 Publish 的 Sequence 是多少,发现 Consumer 的 Sequence 小于 Producer 的 Sequence 就表示 Consumer 有数据能去消费。Consumer 虽然会读 Producer 的 Sequence 但只有弱的可见性要求,并不会去写 Producer 的 Sequence,所以不需要额外的同步机制。弱的可见性要求意思是,Producer 更新 Sequence 后即使没能很快被 Consumer 看到,也只是会延迟了 Consumer 消费 Producer Sequence 对应数据的时间,而对正确性没有影响。
对 Producer 来说,它也会读取 Consumer 最近消费的 Sequence 是多少,需要保证 Producer Sequence 对 Ring Buffer 大小取模后的值不能大于 Consumer Sequence 同样方法取模后的值。因为 Producer 生产数据不能将 Consumer 还未消费的数据覆盖了。这里 Producer 读 Consumer 的 Sequence 也只有弱可见性要求,即使 Producer 读到的是 Consumer 很久之前就处理过的一个 Sequence 值,影响的最多是 Producer 不能继续生产数据,给生产数据的实际带去延迟,但对正确性没有影响。
自适应批处理
前面提到过,Consumer 是单线程的,内部会维护一个 Sequence 表示自己消费到哪里了。Consumer 会读取 Producer 最近 publish 的 Sequence,如果有多个 Producer 就是读这一组 Producer 最近 Publish 的 Sequence 中最小的那个 Sequence。Consumer 消费数据的 Sequence 不能超过 Producer Publish 的 Sequence。那 Consumer 消费时自己的 Sequence 和 Producer Publish Sequence 之间的差值就是 Consumer 可以安全消费的数据数量,于是 Consumer 就能一口气把他们全消费完,做批量处理以提高性能了。
这种批处理是自适应的是因为 Producer 生产的越快,Consumer 每次处理完消息读取到自己的 Sequence 和 Producer Sequence 之间的差值越大,Consumer 能批量处理的数据就越多。
实现细节
截取一些关键的或者比较有意思的实现细节记录一下。代码来自 Disruptor 3.4.2。
避免 False Sharing
False Sharing 之前文章介绍过原理,它就属于不一定会出现但出现后又会极大影响性能的东西。所以能避免就得避免。Disruptor 上很多地方都有使用数据填充的方式去避免 False Sharing。例如 Ring Buffer 、 Sequence、Producer 等。最神的就是 Ring Buffer 里避免 False Sharing 的方法。后续会讲到。
// 标准的避免 False Sharing 的方法。看到是
// 在被保护数据前后都得通过继承的方式做填充
abstract class RingBufferPad {
protected long p1, p2, p3, p4, p5, p6, p7;
}
// 被保护的数据都在这里
abstract class RingBufferFields<E> extends RingBufferPad {
// 数组以及成员变量声明在这里
}
// 被保护数据的后部还得再来继承一个类添加填充
public final class RingBuffer<E> extends RingBufferFields<E> ... {
protected long p1, p2, p3, p4, p5, p6, p7;
}
数组访问
Ring Buffer 是个数组,Producer 和 Consumer 都会大量访问。Producer 和 Consumer 之间没有数据竞争的问题,但是因为 Ring Buffer 上的数据会被 Producer 线程写,被 Consumer 线程读,会有 Cache Coherence 开销。即 Cache Line 在不同 CPU 之间颠簸。Disruptor 需要减小这种颠簸的影响,一个关键就是减小 False Sharing 的影响。
除了 False Sharing 外,Disruptor 还有个优化。Java 的数组默认会有一些基本的边界检查逻辑,Disruptor 为了性能用 UNSAFE 类强行自己实现了数组的访问,去掉了边界检查。
下面为精简代码,省掉了一些不重要的东西。
static {
// 为了判断数组指针是 4 字节的还是 8 字节的
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
REF_ELEMENT_SHIFT = 2;
}
// 计算 Buffer 大小。Disruptor 会在 Ring Buffer 数组前后再填充 128 字节的 Buffer
// 以保证数组的读写操作和数组 Size 的读取操作不会有 False Sharing 问题
// 说明在这里:https://github.com/LMAX-Exchange/disruptor/issues/211
// 这是我觉得一个比较神的点
BUFFER_PAD = 128 / scale;
// 计算数组里实际数据相对对象头的偏移
// 数组对象在内存有这么几部分数据 对象头 + 单个填充 Buffer + 实际数组数据
// 这里计算出 对象头 + 单个填充 Buffer 的大小,这样后续访问数组数据可以直接
// 通过计算偏移位置来进行,例如要读取 index 为 3 的数据,这个数据所在地址
// 相对对象起始地址偏移就是 对象头 + 单个填充 Buffer + 3 * 引用指针大小
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
// bufferSize 是 2 的整数次幂,indexMask 用来对 Sequence 取余计算出 Sequence
// 在数组内对应的 index
// 举例说明 Ring Buffer 大小为 8,Sequence 是 10,对应的 index = 10 % 8 也
// 等于 10 & 7 而 7 就是 indexMask
private final long indexMask;
// 实际数组
private final Object[] entries;
protected final int bufferSize;
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.bufferSize = sequencer.getBufferSize();
this.indexMask = bufferSize - 1;
// 看到数组实际大小是传入的 Buffer Size 加上填充 Buffer 数量
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
// 这里会去使用 EventFactory 构造 Ring Buffer 内的 Event,填充满 Ring Buffer
fill(eventFactory);
}
protected final E elementAt(long sequence){
// 为了避开数组边界检查,访问数组采用最原始的方式,类似于指针加偏移地址
// 访问数组内元素
// 注意 REF_ARRAY_BASE 已经包含了一个 BUFFER_PAD 的大小,所以不用再重复加
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
Consumer 消费数据
做了一些截取之后,Consumer 消费数据部分最核心的代码如下,在 BatchEventProcessor 内:
T event = null;
// sequence 就是当前 Consumer 消费到的 sequence 号,初值为 -1
// nextSequence 就是 Consumer 期望下一个消费的 Sequence 号。当 Producer 告诉
// Consumer 生产出来的 Sequence 大于等于 nextSequence 时,Consumer 就能开始
// 消费数据。
// sequence.get() 是一次 volatile 读,但开销很小
long nextSequence = sequence.get() + 1L;
while (true) {
try {
// sequenceBarrier 是 Producer 提供的,相当于在这里 Consumer 向
// Producer 注册说我要 Producer 生产到 nextSequence 后通知我。
// waitFor 返回后还会告知 Consumer 最大的可消费的 Sequence 是多少
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// 一条一条的消费 Event,直到 nextSequence 和 availableSequence 相等。
// 看到 onEvent 的第三个参数 batchEnd 是如何判断的
// nextSequence 和 availableSequence 之间的差就是自适应的能批量处理的 Sequence 范围
while (nextSequence <= availableSequence) {
// data provider 就是 Ring Buffer
// 这是一次最普通的 get,没有 volatile 语义
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 一批消费完,更新一次 Sequence。Sequence 类似 AtomicLong,set 对应
// AtomicLong 的 lazySet 采用 StoreStore Barrier 不要求 Producer
// 立即看到以提高性能
sequence.set(availableSequence);
} catch (final Throwable ex) {
// 处理 Event 抛异常了则跳过这条 Event
exceptionHandler.handleEventException(ex, nextSequence, event);
// 也是用的 StoreStore Barrier
sequence.set(nextSequence);
nextSequence++;
}
}
有两个比较有意思的点。一个是 Ring Buffer 读取的时候用的最普通的 get。之前提到过 Producer 和 Consumer 都会在没有任何同步机制保护的情况下去读写 Ring Buffer。那 Consumer 这里是以最普通的 get 方式去读取 Ring Buffer 的数据,Disruptor 是怎么保证 Consumer 读取数据的时候读到的一定是 Producer 刚写的数据呢?从前面使用举例上能看到:
Element event = ringBuffer.get(sequence);
event.data = enqueueData
Producer 写入数据时候是先从 Ring Buffer 上读取入队对象,之后直接操作入队对象,之后再也没碰过 Ring Buffer。所以 Producer 写入数据不是 volatile 写入,也没有任何 Barrier 保护,Disruptor 对 Elemnet 的声明也没有任何要求,那看上去 Consumer 似乎是不能读取到最新 Produce 的数据的。后面会慢慢揭开谜底。
再有是 Consumer 最后是 StoreStore Barrier 的使用。Producer 读取 Consumer 的 Sequence 是 volatile 读取,所以 Producer 所在 CPU 在读 Consumer Sequence 时候会立即做 Cache Coherence,拉取 Sequence 最新值。这里 StoreStore Barrier 的意义在于,如果 Consumer 能连续消费数据,Producer 没读取 Consumer 的 Sequence,那连续消费数据期间都不需要主动的去刷 Cache 效率是最高的。后续还能在 Producer 那边看到,Producer 也有机制去保证少读取 Consumer 的 Sequence。
Single Producer 发送数据
Producer 这边分 Multi Producer 和 Single Producer 两种。我们先看 Single Producer,即使用方保证只有一个线程 Produce Event 到队列。
之前介绍过,Produce Event 的过程分三步,先执行 next 获取 Sequence,之后修改获取到的 Sequence 对应的 Event,最后再 Publish 这个修改好的 Event。之后 Consumer 就能消费这个 Event 了。即对 Producer 来说它内部会有两个 Sequence,一个是已知的已经被 Producer 占用的 Sequence,变量名为 nextValue。这部分 Sequence 可能已经被 Publish 也可能还在修改还未被 Publish。另一个是已知的已经 Publish 的 Sequence,变量名称为 cursor。占用的 Sequence 一定在 Publish 的 Sequence 之后,即占用的 Sequence 会大于 Publish 的 Sequence。注意因为 Single Producer 是只有一个线程会 Produce 数据,不会有多个线程去抢占 Sequence 的情况,所以用来记录抢到的 Sequence 的 nextValue 变量是个本地变量,不需要暴露给别的线程知道。
而对于 cursor,从 Producer 内生成的给 Consumer 使用的 SequenceBarrier 就是读 cursor 这个 Sequence,即 Producer 已经 Publish 的 Sequence 号。从而让 Consumer 知道 Producer 现在 Produce 到哪个 Sequence 了,它到底能不能继续消费。所以 cursor 得是 volatile 的变量。
这三步中最重要的是 next()
方法,也即占用 Sequence 的步骤。其实现精简之后如下:
// nextValue 是当前 Producer 最近 Publish 的 Sequence
// nextValue 接下来会访问多次,所以用本地变量缓存
long nextValue = this.nextValue;
// n 是想要获取多少个 Sequence。nextSequence 是期望占用的 Sequence 值,
// next 返回后当前 Producer 就占有从 nextValue 到 nextSequence 之间所有
// 的 Sequence,都能拿去安全操作
long nextSequence = nextValue + n;
// 计算 nextSequence 的折叠点,主要用于避免 Producer 生产太快覆盖
// Consumer 还未消费的 Sequence
long wrapPoint = nextSequence - bufferSize;
// 缓存的已知的 Consumer 消费过的 Sequence
long cachedGatingSequence = this.cachedValue;
// 判断 Consumer 是不是消费的太慢,Producer 是否会追上 Consumer
// cachedGatingSequence > nextValue 这个判断可以忽略不管,是 Disruptor
// 在 LMAX 里特殊用途导致。参看 https://github.com/LMAX-Exchange/disruptor/issues/76
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
// 这个 setVolatile 是 StoreLoad Barrier 也是因为 Disruptor
// 的特殊用途导致,也可以忽略
// https://github.com/LMAX-Exchange/disruptor/issues/291
cursor.setVolatile(nextValue);
// gatingSequences 就是 Consumer 的 Sequence,这里是
// Busy Loop 判断队列是不是持续处于满的状态
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
LockSupport.parkNanos(1L);
}
// 队列不满了更新最新的缓存的 Consumer Sequence
this.cachedValue = minSequence;
}
// 队列已经不满了,能保证 nextValue 到
// nextSequence 之间的 Event 都安全可用
// 于是更新 nextValue 到 nextSequence 并返回 nextSequence
this.nextValue = nextSequence;
return nextSequence;
再往下继续之前有几个点需要强调一下:
- 看到存在 cachedGatingSequence,存的是之前读到的 Consumer 消费到的 Sequence。之所以要缓存这个 Sequence 就是为了能减少对 Consumer Sequence 的读取。因为 Consumer 那边写 Consumer Sequence 是 StoreStore Barrier。Producer 这里是 Volatile 读,每次读取会因为读 Barrier 的存在而强迫 Consumer 线程所在 CPU 刷更改的缓存。而 Producer 这里实际并不要求每次 Consumer 的 Sequence 更新了都得看到。
- 等队列满了以后进入了 Busy Loop 能看到 Producer 每次都会读 Consumer 的 Sequence,以期能今早知道 Consumer 消费完数据,赶紧再去生产更多数据。
- 队列满的时候 Producer 的 Busy Loop 很凶险,会立即让 CPU 占用率到 100%。如果业务上 Consumer 消费的比较慢,队列又有可能被占满的话,Disruptor 可能会导致严重的性能问题,在队列满的时候会大量占用 CPU,带来隐患。
Disruptor 在占好 nextSequence 后,使用方就能从 Ring Buffer 内拿到这个 nextSequence 对应的 Event 做修改。修改好后执行 Publish。Publish 实现如下:
public void publish(long sequence){
// 更新 cursor 也即已经 Publish 的 Sequence,用 StoreStore Barrier 以提高性能
cursor.set(sequence);
// Consumer 会通过 SequenceBarrier 的 waitFor
// 来等待 Producer Publish 新的 Sequence
// waitFor 有可能是阻塞的,所以这里需要去尝
// 试唤醒 Consumer
waitStrategy.signalAllWhenBlocking();
}
这里最主要是看到 Publish Cursor Sequence 的时候可以用 StoreStore Barrier。这个 StoreStore Barrier 有两个作用:
- 提高性能。即使 Consumer 没立即看到 Produce 的 Sequence 也没事,Consumer 最多是晚一点点去执行消费,不会有错误。
- 保证 publish 写入的 cursor 被别的线程看到时,publish 之前的写入操作一定能被看到。这个非常关键。回顾之前提的问题,Producer 和 Consumer 都是无任何同步机制保护的去访问 Ring Buffer。之前 Element 入队那个例子分析过,在 Producer 线程修改了 Ring Buffer 上的 Element 完成入队后,没其他机制保证的话 Consumer 不一定能读到 Producer 入队的数据,可能读到的是曾经入队的数据。而这里 Publish 写 cursor 加入的 StoreStore Buffer 就保证了当 Consumer 看到 cursor 被更新到比如 100 时,Producer 线程在 Ring Buffer 上对 100 这个 Sequence 对应的数据所做的所有修改 Consumer 一定能看到。
Multi Producer 发送数据
Multi Producer 支持多个 Producer 一起产生数据。如前所述,Producer 需要先占一个 Sequence 再 Publish 占用的 Sequence。之前 Single Producer 只有一个线程会生产 Event,所以占用的 Sequence 可以用普通成员变量来存,只有 Publish 的 Sequence 因为要让 Consumer 线程看到,所以用了 volatile 变量。而 Multi Producer 因为有多个线程去产生 Event,假如我们还是采用 Single Producer 那种方式,Cursor Sequence 标记的是 Publish 的 Sequence,每个 Producer 线程先去抢 Cursor 之后的 Sequence 抢到以后做 Produce Event 操作,最后再更新 Cursor 即如下图:
Cursor 标记的是已经 Publish 的 Sequence。Thread 1 2 3 是三个 Producer 线程,他们通过某种方式抢占到了 Cursor 之后的连续的三个 Sequence。但每个 Thread 执行 Produce Event 的速度是不一样的,比如 Thread 3 先准备好了 Event 准备做 Publish 操作,而 Thread 1 2 还未做好 Publish Event 的准备,那 Thread 3 无法更新 Cursor 到 Thread 3 指向的 Sequence,不然 Consumer 看到 Cursor Sequence 更新后,就会消费 Thread 1 2 还未准备好的 Event 了。
另外这种方式下还要有个机制去保证每个 Producer 线程能安全抢占 Sequence,可能就再需要一个 Atomic 的变量通过 CAS 操作去做这个抢占。即这种方式比较冗余性能也不好。
另外一个方式是与 Single Producer 不同。Cursor Sequence 不再用于标记 Publish 的 Sequence 而是标记已经被某个 Producer 线程占用的 Sequence 中最大的那个 Sequence。每个 Producer 线程通过 CAS 操作去抢 Cursor Sequence,去分配 Sequence。大概是这样:
假设 Ring Buffer 的大小是 8。Published 表示已经 Publish 过的 Sequence,看到 0、2、4、5、6、7 是 Published 状态,1、3 还有 Producer 线程正在 Produce Event。Sequenc 是个永远递增的 Long 值,Cursor 这个 Sequence 在对 Ring Buffer Size 取模后指向 Ring Buffer 内 index 为 5 的位置,这个位置是一个已经 Published Sequence。
举例说明,假设 Cursor Sequence 值为 21,下次再有 Producer 线程来的时候会先 get 一次 Cursor 当前的 Sequence 值 21,再用 CAS 去抢 22 这个 Sequence,能成功将 Cursor Sequence 从 21 更新到 22 则说明这个 Producer 线程抢到了 22 这个 Sequence。
看到 Cursor 后面 6 7 两个位置也是 Published 状态,因为是上一轮生产的 Event 已经被 Publish 过。例如 Cursor Sequence 现在是 21,那 6 7 两个位置当前对应的 Sequence 就是 14 和 15。
这个方式下因为 Cursor 被拿去表示被 Producer 抢占到的 Sequence,Consumer 看到 Cursor 更新后,并不表示 Cursor 之前的 Sequence 都处在 Published 状态,正如上图看到 1 和 3 这两个位置还并没有 Publish。所以需要 Minmum 指针去记录哪个区间里存在未 Published 的 Sequence。Consumer 最多只能消费到 Minmum 指向的 Sequence。但这个方案下,Minimum 的更新就需要多种同步。一个是 Producer 线程之间,更新 Minimum 指针的线程在更新 Minimum 时需要知道 Minimum 的下一个 Sequence 是否是 Published 状态,而下一个 Sequence 本来是其它 Producer 线程负责的。另一个是 Producer 和 Consumer 之间,更新 Minimum 后需要 Conusmer 也知道,这也有线程之间数据同步。
这里可能会有个疑问,如果是这个方案,看上去 Cursor 并不需要 Consumer 知道,Consumer 只要知道 Minimum 就好了。Consumer 不需要读 Cursor。但问题在于 Consumer 以及 Consumer 使用的 SequenceBarrier 要支持 Single Producer 和 Multi Producer。Single Producer 输出出来的是 Cursor。如果 Multi Producer 输出的不是 Cursor,设计实现上就略不统一,就比较割裂。当然这个问题并不是不可克服,只是 Single 和 Multi Producer 实现越接近实现起来也会越优雅和简洁。
最终来到了 Disruptor 的方案。多引入了一个 Available Buffer 数组,是个长度与 Ring Buffer 相同的 int 数组。下图依然假设 Ring Buffer Size 为 8,Cursor Sequence 是 21。Ring Buffer 内的数字是数组的 Index。但是 Availabe Buffer 内的值是 Round 数,即 Sequence 除以 RIng Buffer Size 得到的值,也即 Ring Buffer 上数据被反复写的次数。下面详细说。
Availabe Buffer 正如其名字,用来表示 Ring Buffer 上对应位置的 Event 是否处于 Published 状态。数组内的 int 值初始为 -1。当一个 Sequence Published 后,用 Sequence 的值除以 Ring Buffer 的 Size,将结果值存入 Available Buffer。拿 Cursor 当前指向的 21 这个 Sequence 为例,21 这个 Sequence 被 Publish 后,在 21 % 8 = 5 这个 Index 对应的 Available Buffer 上的 int 槽存 21 / 8 = 2。即 21 这个 Sequence 做 Published 后,已经是 Avaialbe Buffer 下 index 为 5 的这个槽第 3 次被 Publish 了,所以说 Available Buffer 内存的是 Round。由此我们就能知道图上 Available Buffer 值的含义了,6 7 两个位置是 1,是因为这两个位置 Published 的是上一轮 Event,本轮 Cursor 还没指到他们。已经被本轮 Cursor 指向过的 Index 里,1 3 这两个位置的 Event 对应的 Sequence 是 17 19 还未被 Publish,所以 Available Buffer 内的值是 1。
对于任意一个 Sequence 只要读一下 Available Buffer 就能知道这个 Sequence 是否处于 Publish 状态。例如 Thread 1 拿到的 Sequence 是 17,对应的轮数是 17 / 8 = 2,但 Available Buffer 上的值是 1,所以 17 这个 Sequence 还没有 Publish。Sequence 18 对应的轮数是 18 / 8 = 2,18 在 Available Buffer 上的值是 2,所以 18 这个 Sequence 被 Publish。
有了这个 Available Buffer,每个 Producer 线程更新 Cursor 占好 Sequence 后,更新 Sequence 对应的 Event。执行 Publish 时,就是计算这个被占的 Sequence 在 Available Buffer 上的 Index 并将 Sequence 对应的 Round 数存进去。以上过程 Producer 线程之间没有任何数据需要同步,每个 Producer 线程修改自己 Sequence 在 Ring Buffer 对应的 Event,以及在 Available Buffer 对应的 Round。
在 Consumer 线程这边,Consumer 线程当前消费的 Sequence 一定在 Cursor Sequence 之前。如果 Cursor Sequence 是 21,Consumer Sequence 是 13。(Consumer Sequence 不可能小于 13,不然 Ring Buffer 就被占满了,Producer 不可能生产数据)。那 Consumer 就需要检查 14 ~ 21 这 8 个 Sequence,看他们在 Available Buffer 上的值和 Seqence 对应的轮数是否相等,相等表示已经 Publish 不等就是没 Publish。从而在 14 ~ 21 内找到 Sequence 最小的那个处于 Published 状态的 Sequence。以上图为例就是 17,Consumer 就能安全的消费 14 ~ 17 这 4 个 Sequence 对应的 Event 。
这种方式跟之前比起来,Producer 线程之间没有任何数据同步。Producer 和 Consumer 之间和原来一样,一个是需要同步 Cursor Sequence,再有是同步 Available Buffer 内的值。同步的数据少了,并且比维护 Minimum 指针实现起来要简单优雅多了。以上图为例,假如 Minimum 指向 Thread 1 对应的 Sequence 17。Thread 1 在 Publish 后,它得有办法知道 18 这个 Sequence 已经被 Publish 了。还是得有地方去记录这个事情。实现起来远不如现在 Available Buffer 实现方式来的容易和直观。
有了这些铺垫之后,Multi Producer 的代码就清晰了。还是先看 Producer 实现占用 Sequence 的 next()
方法:
long current;
long next;
do {
// 先获取当前 Cursor,计算下一个 Cursor 的值
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
// 判断队列满了没,cachedGatingSequence > current 可以不看,
// 在 Single Producer 那里说过
// 为什么要有 cachedGatingSequence 在 Single Producer 那也说过
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
// 读 Consumer 当前消费的 Sequence 是多少,队列满了就 Busy Loop
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence) {
LockSupport.parkNanos(1);
continue;
}
// 更新缓存的 Consumer Sequence,用的 StoreStore Barrier
// 因为读的时候并不要求读到最新值,读到老值后会通过
// Util.getMinimumSequence(gatingSequences, current);
// 读取到 Consumer 最新 Sequence
gatingSequenceCache.set(gatingSequence);
// 抢占从 current 到 next 之间的 Sequence,没成功就继续抢,成功了就返回 next
} else if (cursor.compareAndSet(current, next)) {
break;
}
}
while (true);
return next;
处理好 Event 后要执行 Publish:
public void publish(final long sequence) {
// 更新 Sequence 对应的 Available Buffer 内的值
setAvailable(sequence);
// 唤醒 Consumer
waitStrategy.signalAllWhenBlocking();
}
Cursor 因为在 next()
里 Producer 在抢 Sequence 时候就会更新,所以和 Single Producer 不同,Multi Producer 这里 Publish 时候就不更新 Cursor 了。
更新 Available Buffer 方式如下:
private void setAvailable(final long sequence) {
// calculateIndex(sequence) 就是 sequence % ring buffer size
// 得到 sequence 在 Available Buffer 内的 index
// calculateAvailabilityFlag(sequence) 就是 sequence / ring buffer size 计算
// sequence 的 Round 数
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag) {
// 和更新 Ring Buffer 数组方式一样,直接用地址更新
// 某 index 的值,不做任何数组越界检查
long bufferAddress = (index * SCALE) + BASE;
// 用 StoreStore Barrier 来存 Available Buffer 内的值,
// 原因和 Single Producer 那一样
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
这里看到 Available Buffer 写入也是用的 StoreStore Barrier,一方面是提高性能,连续多次 Produce 时并不需要刷 Cache。再有是保证 Consumer 看到 Sequence 在 Available Buffer 上 Available 后,Publish 前对 Ring Buffer 的更改也能被 Consumer 看到。
判断一个 Sequence 是否 Available 如下:
public boolean isAvailable(long sequence) {
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
// 以 volatile 方式读 Sequence 在 Available Buffer 内的值是
// 否等于 Sequence 的 Round 数。就好像 Single Producer 下
// Consumer 用 volatile 方式读 Producer 的 Cursor 一样,
// 为了真的能立即看到 Producer 的写入
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
看到这里不知道是否会有这么个疑问,为什么 Producer 那里为了减少对 Consumer Sequence 的读取,做了一个 gatingSequenceCache,而 Single Producer 对 Cursor 的写入以及 Multi Producer 对 Available Buffer 的写入都也是使用的 StoreStore Barrier,在 Consumer 一侧就没有个类似的 sequence cache 去减少 Consumer 对 Producer Cursor 或 Available Buffer 的访问?
这主要是 Consumer 和 Producer 功能不同导致的。Producer 一侧在 Produce 数据的时候是一定有数据已经产生要放在队列里,所以如果 Producer 通过缓存的 Consumer Sequence 发现队列可能满了,就再以 volatile 方式读取一次 Consumer 实际的 Sequence 值,再做一次队列是否满了的判断就行了,绝大多数时候 Producer 并不关心 Consumer 的 Sequence 是什么。而 Consumer 一侧为了完成批量的消息处理,它每一个循环都希望能及时知道 Producer 最新生产的 Sequence,从而能将 Consumer Sequence 到 Producer 最新 Produce Sequence 之间的数据一口气全部处理掉。所以 Consumer 这边需要及时知道 Producer 的 Sequence。
小结
从上面分析能看到 Disruptor 实现有很多精巧的设计。几乎每个共享变量的写入操作都值得推敲,仔细思考原因。能做这么细致也不奇怪为什么 Disruptor 性能会好了。
再有个人觉得在使用上一个关键的点就是一定不要让 Disruptor 队列满了,不然 Producer 进入 Busy Loop 开销会很大。