前奏

ConcurrentLinkedQueue 是无锁队列的实现,可以先看看 blog 大致了解简单版本无锁队列的实现方法:

无锁队列的实现 – 酷 壳 – CoolShell

需要注意的是,这个 blog 里的无锁队列没有展示如何对出队 Node 做处理,即出队后 Node 的 next 指针依然指向队列内下一个元素。不过看看这个实现对理解后续内容有帮助。

再就是看看 ConcurrentListQueue 的论文:

Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 基本只需看完 Figure 1 的伪码实现即可。看论文需要注意的是它在进行 CAS 操作时候有特殊处理,即在 CAS 时需要在被操作变量地址之外增加计数,以避免 ABA 问题。

ABA 问题在上面 CoolShell 的 blog 里也有介绍,CAS 在更新某个指针的时候,compare 的是一个地址,可能出现 compare 成功但是该地址指向的对象其实已经变化。比如 a 开始指向对象 x,再 compare_and_swap(a, x, y) 当 a 指向 x 时将 a 替换为指向 y。但因为 compare 的是内存地址,假设 x 地址被回收再利用现在变成对象 z 在使用,导致 compare_and_swap(a, x, y) 依然执行成功但语义是错的。上面论文里解决办法就是在进行 CAS 操作时候增加计数器,通过比较计数器来保证及时对象地址被复用,因为计数器是变化了所以也不会错误的执行成功。

但上述产生 ABA 问题的原因对 Java 来说不存在。因为有 GC 的存在,我们不用自己管理内存,拿上述例子来说 a 开始指向 x,如果 x 能被回收再利用,那 a 也该被 GC 而无法使用。简单说就是一个内存地址能被复用,一定是内存里没有任何引用能指向该地址之前存放的对象。所以 Java 里实现上述论文算法时候不用考虑在 CAS 时候还加上计数器,这也是 ConcurrentListQueue 里有如下描述的原因:

/there is no possibility of ABA problems due to recycled nodes, so there is no need to use "counted pointers" or related techniques seen in versions used in non-GC'ed settings./

那既然 Java 不会有 ABA 问题,为什么会有 AtomicStampedReference 的存在?

主要是在 ConcurrentLinknedQueue 语境下,Java “不会有 ABA 问题” 仅指对象地址被重用导致的 ABA 问题,也就是 ConcurrentListQueue 论文里说的 ABA 问题。即对象 A 在地址 123,但 A 对象释放后 123 地址被另一个对象 B 重用。则 CAS 时候如果仅仅比较地址是否变化是无法发现对象从 A 变成了 B 的。这种问题在 Java 内不会存在,因为我们无法显式释放内存,内存也不会被随便重用。

但是 Java 有另一个 ABA 问题,就是两个线程都要 (并不是非要同时) 修改一个引用,比如 X 线程要将引用从地址 123 改到 456,如果只是将 A 从 123 地址改到 456,就直接使用 AtomicReference 的 compare_and_swap 即可,这个操作一定是原子的,从比较到实际修改之间不可能有别的线程插进来比如先把 123 改到 890 再改回 123。但如果 X 线程不只想让 A 从 123 改到 456,还希望保证在执行 compare_and_swap(A, 123, 456) 之前,A 一直指向 123 地址从未改变过,AtomicReference 就无能为力了。因为它的 CAS 操作仅比较 CAS 操作开始那个时刻 A 当前是否还指向 123 地址,而 CAS 开始之前是否曾经从 123 指向过别的地方再指回来就说不清了。这也是一种 ABA 问题,解决办法是 AtomicStampedReference,即增加 stamp,CAS 时候不光比较地址还比较 stamp。X 线程开始知道 A 地址为 123 且 stamp 为 100,且每次修改 A stamp 一定会增加,那 CAS 执行时不光比较地址 123 还比较 stamp,都相同时表示 X 从知道 A 指向 123 且 stamp 为 100 开始到 CAS 执行时刻,A 都未指向过其它地址。

对于无锁队列实现来说,比如 ConcurrentLinkedQueue 里有很多 casTail(A, B) 即 tail 是否还指向 A 是的话改 tail 指向 B。队列内 Node 都是有方向的,并且每次入队新元素就会重新分配内存生成新 Node,所以 tail 不可能开始指向 A 后来又改到 C 又改回 A,如果出现了意味着 A 入队后 (从 tail 入队) 又入队了 C,结果 A (不是值,是 Node) 又入队了,而这是不可能的。所以 ConcurrentLinkedQueue 不会有需要 AtomicStampedReference 解决的 ABA 问题,也不会有地址重用导致的 ABA 问题。

有个参考可以看:java - CAS and ABA issus - Stack Overflow

实现细节

成员和初始化

队列内部成员,首先是 head:

private transient volatile Node<E> head;

有几个关键的不变量是:

  1. 所有队列内元素一定能从 head 顺着链表访问到
  2. head 一定不是 null
  3. (tmp = head).next != tmp || tmp != head 一定成立。

第三个需要说明一下。首先是出队的元素都会从 head 出队,出队后会修改 Node 的 next 指针指向 Node 自己。因为 head 指针会随时发生变化,可能刚拿上 head 时候还真的是队列的 head,但下一时刻 head 就被出队了,当前线程拿到的 head 就不再是队列当前 head 而是一个已经出队的 Node。所以第三个条件的含义是,要么 head 还未出队,即 next 指针不指向自己,要么是已经出队那再次读 head 指针一定已发生变化。

不能保证的变量:

  1. head.item 可能是 null
  2. tail 指向的 Node 可能不能从 head 顺着链表被访问到。

这两个的原因是,ConcurrentLinkedQueue 中,head 和 tail 不是立即被更新的。比如入队是每入队两次,更新一次 tail 指针。head 是每出队两次,更新一次 head 指针。入队时候,只要元素入队成功即可,不一定去更新 tail,tail 还指向最新 Node 的前一个 Node,这也不影响程序正确性,并且还能减少 tail 这个 volatile 指针的更新次数。对 head 也是这样,只要出队成功就会设置 head.item 为 null,保证同一个元素不可能出队两次即可,但 head 指针可以不每次出队都更新,而 ConcurrentLinkedQueue 是每出队两次更新一次 head。

也就是说,head,tail 是为了出队入队效率高才被更新到实际队列首尾的,即使这俩不指向队列实际首尾对正确性也没有影响。

如果 tail 指向的 Node 已经被出队,则从 head 顺着 next 指针无法访问到 tail 指向的 Node。但这个对正确性无影响。

接着是 tail:

private transient volatile Node<E> tail;

不变量是:

  1. 队列实际的 tail 一定可以通过 tail 指针顺着 next 一路访问到;
  2. tail != null

不保证的变量:

  1. tail.item 可能是 null;
  2. tail 指向的 Node 可能不能从 head 顺着链表被访问到;
  3. tail.next 可能会指向 tail 自己;

有了前面的描述,这里不变量和可变量应该都比较好理解。即使入队时候会检查入队 item 是否为 null,不许 null 作为 item 被入队,但因为 tail 指向的 Node 可能被出队,所以 tail.item 可能是 null,tail.next 可能也会指向自己因为被出队,或者 tail 无法被 head 顺着 next 指针访问到。

构造函数里,跟 LinkedBlockingQueue 一样也是将 head 和 tail 初始化为指向哑元。

head = tail = new Node<E>(null);

需要注意到,ConcurrentLinkedQueue 是无限长队列,也没有 count 专门记录队列长度,每次求取队列长度 size 时,都会从头到尾遍历整个队列去数元素数量,并且队列因为随时会变得到的 size 结果也不反应队列实际精确值。

入队

final Node<E> newNode = new Node<E>(e);
// t 指向老的 tail,p 指向最新的 tail,q 指向 tail 的 next
for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;
    if (q == null) {
        // 如果 q 是空则 p 指向的是实际队尾
        if (p.casNext(null, newNode)) {
        // CAS 的更新队尾的 next 指针,指向新入队 Node
            // 如果成功,则说明入队成功,因为 tail 是每入队两次才更新
        // 所以只有当前 tail 和 old tail 不等时候才会更新 tail
            if (p != t) // hop two nodes at a time
            // 即使更新失败也没事,说明有别的线程已经更新了 tail
                casTail(t, newNode);  // Failure is OK.
            return true;
        }
        // Lost CAS race to another thread; re-read next
    }
    else if (p == q)
    // tail 的 next 指向自己,说明 tail 已经被出队,接着读一下最新 tail
    // 看最新的 tail 和循环开始缓存在 t 的 tail 是否相等
    // 相等则说明新读的这个 tail 指针也被出队了,我们只有从 head 一步一步循环去找 tail
        // 如果不等则说明 tail 已经被其它线程更新了,我们将新的 tail 再缓存在 t 再重来一次
        p = (t != (t = tail)) ? t : head;
    else
    // 最后这个 else 下面单独说
        p = (p != t && t != (t = tail)) ? t : q;
}

else if (p == q) 简写扩展后是:

tOld = t;
t = tail;
if tOld != t:
    p = t;
else:
    p = head;

对于最后一个 else 需要单独说明。p = (p != t && t != (t = tail)) ? t : q; 扩展后是:

tOld = t;
t = tail;
if p != tOld && tOld != t:
    p = t;
else:
    p = q;

因为每入队两次才更新 tail,走到最后一个 else 时,说明循环开始时 tail 要么指向实际队尾的前一个 Node,要么指向再往前的 Node,总之没有指向实际队尾,并且 tail 还未被出队。因为是每入队两个 Node 才会更新一次 tail 指针,所以这种情况完全正常,如果 p 和 t 相等,说明 p 在循环开始后还未向后挪动过,那我们就让 p 向后挪一格,再重试。如果 p 和 t 不相等,则说明 p 已经向后移动过一次,理想情况下已经能入队了,现在依然不能入队说明有别的线程在争抢入队,并且很可能已经更新过 tail (还是因为每两次入队就会更新 tail,我们这个线程在循环开始时 tail 就没有指向实际队尾,现在往后走了一格还不能入队,则说明 tail 很可能已经被更新)。那就先读取一下最新的 tail,比较好理解的是 tail 如果发生了变化,则比较符合期待,因为别的线程更新了 tail,那当前线程就更新 t,更新 p 再重试。那如果读取完 tail 后发现 tail 没有变化呢?这说明什么呢?

说明出现了下面这种情况,队列至少有 A B C 三个 Node,tail 指向至少倒数第三个 Node,

   A ---> B ---> C ---> null
   |      |      |
tail/t    p      q

正常来说是不会有这种情况的,因为毕竟每两次入队都会更新 tail 指针,但假若一个线程入队成功后因为不知道什么原因挂掉了导致更新 tail 失败,那这里的兜底操作就有用了,让 p 向后走一格后再重试。如果没有这个兜底策略会把 p 再次设置为 tail/t 导致无穷循环。

casNextcasTail用的 UNSAFE 的 compare and swap object 好像没有特别需要说的。

出队

restartFromHead:
for (;;) {
    // 因为 head 可能未被更新,可能被出队,所以从 head 开始用循环方式开始出队 Node
    // h 指向老的 head,p 指向当前认为的 head,q 指向 p 的 next
    for (Node<E> h = head, p = h, q;;) {
        E item = p.item;
        // 如果 item 非 null 说明 head 指向的 Node 还未出队
    // CAS 的将 item 设置为 null,哪个线程成功执行 CAS 就是哪个线程成功执行出队
        if (item != null && p.casItem(item, null)) {
        // 出队成功后判断 p 是否向后移动过,没移动则不更新 head 
        // 因为每出队两个 Node 才更新一次 head
            if (p != h) // hop two nodes at a time
            // 注意看到与 LinkedBlokcingQueue 不同 这里出队的可能是 head
        // 指向的节点而不一定是 head 的 next
        // 更新 head 时候需要判断队列是否为空,不空时能将 head 直接更新到 p.next
                updateHead(h, ((q = p.next) != null) ? q : p);
            return item;
        }
    // 队列为空,则尝试更新 head 为 p 后直接返回 null
        else if ((q = p.next) == null) {
            updateHead(h, p);
            return null;
        }
        else if (p == q)
        // p 的 next 指向自己说明 head 被出队需要再读 head 从头开始
            continue restartFromHead;
        else
        // 走到这里说明出队 head 竞争失败,并且队列非空且当前 p 指向的节点还在队列上
        // 所以尝试沿着 next 往后走一格继续尝试出队
            p = q;
    }
}

其中 updateHead 如下:

// h 是当前认为 head 指向的对象,p 是期望更新 head 后的对象。
if (h != p && casHead(h, p))
    // 将 head 从指向 h 改为指向 p 后,也即 h 节点出队完成
    // 将出队的 Node 的 next 指向自己
    h.lazySetNext(h);

看到 if 条件一开始就判断 h != p,主要是避免队列是空的。当队列为空时候,h == p == head 并且 head.itme == null,可以走到 updateHead 并且 h 和 p 相等,此时不用更新 head。

lazySetNext如下:

UNSAFE.putOrderedObject(this, nextOffset, val);

这东西看了好久。牵扯一大堆东西。等有机会记录一下 Memory Barrier 相关东西。

先看一下 Node 是这样的:

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    ...
}

当更新 next 指针的时候,有 casNext,更新 next 后会被其它线程立即可见,写入操作后会带着 write fence 或者叫 store-load barrier,以刷新 CPU 缓存的 Store Buffer。而这个 lazySetNext 是用的 store-store barrier,其只是保证如果写入 next 之前还写了别的东西,则其它线程如果能看到 next 的写入那一定能看到 next 之前的写入。事实上 X86 架构下 CPU 本身就能满足 store-store barrier,所以这个 barrier 就是写本地 CPU cache,没有附带的例如刷写 Store Buffer 的操作,所以性能会好。但带来的问题就是写入操作不会立即被其它线程看见。对于更新 head 的 next 指向自己操作来说更新的晚一点并没关系。

内部 Node 初始化时候还有个黑科技,ConcurrentLinkedQueue 对其描述是这样:

When constructing a Node (before enqueuing it) we avoid paying
for a volatile write to item by using Unsafe.putObject instead
of a normal write.  This allows the cost of enqueue to be
"one-and-a-half" CASes.

Node 的构造函数如下:

Node(E item) {
    UNSAFE.putObject(this, itemOffset, item);
}

而 Node 里的 item 是 volatile 的,本来 volatile 语义下如果执行 this.item = item 会导致写入后有个 write fence,会刷写 CPU cache 内的 Store Buffer 到主存。但 Node 每次构造出来后,会立即执行入队操作,而 Node 只有执行入队操作成功后才是有效的,才有被别的线程看到的价值。而入队成功的标志是 p.casNext(null, newNode) 这个操作成功,而 Node 内的 next 也是 volatile 的也有 volatile 语义,所以 Node 入队成功会有两个 write fence 就多了一个消耗。所以 ConcurrentLinkedQueue 里把 this.item = item 改成了上面用 putObject 的方式。putObject 就是普通的写入,没有 volatile 语义,不会在写入后增加 write fence,直到入队成功后,更新 tail 的 next 指针成功后,才会增加 write fence 让其它线程看到这个新 Node。

感觉这个地方不好描述。总之要注意到的就是,p.casNext 虽然操作的是 p 指向的节点,但因为 volatile 语义是刷新当前执行该线程 CPU 的 Store Buffer,所以当 p.casNext 成功时候,对 newNode 内 item 的写入也会被刷写到主存。

比如现在有 a b c d 四个 volatile 变量,如果无脑执行:

a = 1;
b = 2;
c = 3;
d = 4;

会在每个语句中间加上 write fence 刷四次 cache。而如果是 a b c 都是用 putObject 更新,d 用正常的 volatile 更新,则 d = 4 成功后,a b c 都会在 d 之前被刷写至主存。

删除元素

目标删除的 Object 是 o。

if (o == null) {
    return;
}
// 从头到尾遍历一遍找目标 o
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
    boolean removed = false;
    E item = p.item;
    if (item != null) {
        // p 指向的 Node 有 item 且不等于 o 则查看下一个 Node
        if (!o.equals(item)) {
            next = succ(p);
            continue;
        }
        // 找到 o 则清理 p 指向的 Node 
        removed = p.casItem(item, null);
    }

    // 走到这里说明至少找到一个 item 为空的 Node,不管跟我们要找的 o
    // 有没关系,先尝试将其从链表移除
    next = succ(p);
    if (pred != null && next != null) // unlink
        pred.casNext(p, next);
    // 如果已经找到了 o 则返回
    if (removed)
        return true;
}

总体实现还是很直观的。

需要关注的实现亮点

  1. 无锁队列实现算法得关注一下
  2. unsafe 下,putObject,putObjectVolatile,putOrderedObject 有什么区别

特点总结

  1. 无界,有 OOM 风险;
  2. 每次入队都要创建 Node;
  3. 无锁,并发性能很高;
  4. 没有阻塞接口,不可能阻塞的等待元素到来;
  5. 无法获取队列长度精确值;

参考

ConcurrentLinkedQueue的几个细节问题 | throwsnew