前奏
实现来自论文:Nonblocking Concurrent Data Structures with Condition Synchronization
SynchronousQueue
不太像是 Queue,更像是一种同步策略,一种 Producer 把 data 同步的传递给 Consumer 的策略。入队时,不光是 data 入队还带着执行入队操作的线程也入队,等待在队列上,只有 Consumer 来了消费掉一个 data 后,入队该 data 的 Producer 才能出队。同样,出队时如果队列内没有已经在排队的 Producer,则 Consumer 需要在队列上排队等待,等 Producer 来了将 data 交给一个 Consumer 后这个 Consumer 才能出队,其它 Consumer 依然等在队列上。
SynchronousQueue
内部通过 Dual-Stack 和 Dual-Queue 来分别实现 fair 和 unfair 两种模式。默认是 unfair 的用 Dual-Stack 实现,能通过构造参数来变成 fair 的。比较不一样的是,SynchronousQueue
在 fair 模式下在并发很高的时候反而可能会有相对更高的性能,但是 unfair 模式下能减少线程唤醒,且对 Cache 的 Locality 特性更友好。
先大致介绍一下实现,比如 unfair 模式下使用 Dual-Stack 连续入栈 A B C 三个元素:
C --> B --> A --> null
|
head
入栈后假设没有 Consumer,A B C 三个线程都 Park 等待 Consumer。
Consumer 来了后,从 Stack 上出栈一个元素,叫做匹配,匹配成功则唤醒入栈该元素的 Producer 线程。比如来了一个 Consumer 后 Stack 就变成:
B --> A --> null
|
head
把上面例子换成一开始是先来了 A B C 三个 Consumer 排队,再来一个 Producer 将 Consumer C 出栈和上面图是一样的。也就是说链表内排队的元素即可以是 Consumer 也可以是 Producer,这也是 Dual-Stack, Dual-Queue 的由来。即 Stack 或 Queue 有两种工作模式,Producer 等待模式或 Consumer 等待模式,根据 head 或者 tail 指向的 Node 就能判断出来它现在工作在什么模式下。比如 Producer 入栈时候,当前线程就在 Dual-Stack 内根据 head 指向的 Node 判断其是不是 Consumer,如果是则执行匹配操作,即将 Consumer 出队,如果不是则说明 Dual-Stack 内当前等待的都是 Producer,Stack 处于 Producer 等待模式,则当前线程也等待在 Stack 上。
fair 模式下是用 Dual-Queue,还是假设先有三个 Producer 入队 A B C 三个元素:
A --> B --> C --> null
| |
head tail
来了一个 Consumer 后,发现 head 是 Producer 则队列处在 Producer 等待状态,那就执行匹配操作从队首出队一个元素变成:
B --> C --> null
| |
head tail
Dual-Stack 因为是 LIFO 的,所以是 unfair 的,如果一直有线程进来排队,那最早一个排队的线程会饥饿。并且因为只有一个 head 指针,入队出队全部要抢这个指针所以并发特别高的时候竞争要相对更加激烈。而 Dual-Queue 是 FIFO 的,会将等待时间最长的那个线程出队,并且由于有 head 和 tail 两个指针存在,并发高的时候至少能减少 Producer 和 Consumer 的竞争,但牺牲的就是 CPU 的 cache locality 友好度。
实现细节
TransferStack
先说 unfair 模式。SynchronousQueue
内的 Dual-Stack 叫做 TransferStack
内部通过:
E transfer(E e, boolean timed, long nanos)
这一个 transfer
method 就能实现 offer, poll, offer with timeout, poll with timeout, put, take 操作。这也是上面提到 Dual-Stack, Dual-Queue 的原因,它是能工作在两种模式的。
直接看 TransferStack 下的 transfer
实现,它里面是个大 Loop 做三个事情,本来想自己写,但发现 SynchronousQueue 的 comment 写的已经很好很了:
- If apparently empty or already containing nodes of same mode, try to push node on stack and wait for a match, returning it, or null if cancelled.
- If apparently containing node of complementary mode, try to push a fulfilling node on to stack, match with corresponding waiting node, pop both from stack, and return matched item. The matching or unlinking might not actually be necessary because of other threads performing action 3:
- If top of stack already holds another fulfilling node, help it out by doing its match and/or pop operations, and then continue. The code for helping is essentially the same as for fulfilling, except that it doesn't return the item.
SNode s = null; // constructed/reused as needed
// e 是入队的数据,如果有数据则是入队操作,没数据则是出队操作
int mode = (e == null) ? REQUEST: DATA;
for (;;) {
SNode h = head;
// 这里的 if ... else if ... else 就是上面 comment 里说的三个事情,完全对应
// 先是判断队列是否为空,或者 head 的 mode 跟自己一样,说明当前线程得 Park 或返回空
// 需要注意的是下面会看到,在匹配 Node 时会将入栈的 Node 的 mode 标记 fulfilling,而此处
// 判断时用的是 ==,所以如果 head 是 fulfilling 的 Node,那后续无论来 Producer 还是 Consumer
// 都进入不了这个 if,都得先去帮助这个 head fulfilling
if (h == null || h.mode == mode) { // empty or same-mode
// timed 和 nanos 一起判断当前线程是否阻塞,那入队来说就是 put 还是 offer 还是 offer with timeout
// 如果只用 nanos 一个变量比如 nanos <= 表示 offer,nanos > 0 表示 offer with timeout
// 这样就没有办法去表示永久等待的 put 了,所以多了 timed 参数
if (timed && nanos <= 0) { // can't wait
// 不等待本来是可以直接返回空的,但已经读取了 head 就判断一下 head 有没有被 cancel
// 有的话就把 head 出栈。这个操作一开始看的时候很奇怪,因为正常来说 Node 只会被入队自己的那个线程 cancel
// 并且 cancel 后会立即尝试清理这个 Node 将其出栈,就是下面的 awaitFulfill 出来后悔立即执行 clean 那
// 这里为什么不能让清理 canceled Node 事情就交给那个会从 awaitFulfill 出来的线程就好呢?
// 这里一方面是保底,正常来说线程不可能跑着跑着自己突然死掉,但遇到 Error 的时候线程是会突然死掉的比如 stack overflow,oom 之类的
// 另一方面是更重要的,后续看到 clean 的时候会说,一个 Node 被 cancel 后,它为了把自己从链表上移除
// 需要能找到这个 Node 前面的 Node,但这里链表是单向的,只能从 head 完整的过滤一遍链表才能找到某个 Node
// 所以这里就实现成只要是操作 stack 的线程发现 head 被 cancel 了,那就尝试把它移除,因为对于这个线程来说
// 就是顺手做了个事情,并不会增加太多工作,还能帮助保持链表长度,加快节点 Cancel 后被移除的速度
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
// 如果需要等待在栈上,就先创建一个 Node 入栈,一会当前线程就 Park 在这个 Node 里
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 这里面会执行 Park,当返回的时候要么是 Node 被 Cancel 要么是 Node 找到了匹配的 Node
// 如果是被 Cancel 了,那返回的 Node 和当前线程操作的这个 Node 是同一个 Node
// 如果返回的不是当前线程操作的 Node,那就是拿到了当前线程操作 Node 匹配的 Node
// 比如入队操作这里如果能返回,说明 Producer 把 data 交给了 Consumer,这里返回 Consumer 的 Node
// 如果是出队操作在这里返回,说明拿到了 Producer 的 data,这里返回 Producer 的 Node
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
// 当前节点被 Cancel,所以执行 clean
clean(s);
return null;
}
// s 是当前线程绑定的节点,走到这说明 s 找到了与其匹配的 Node m
// 我们知道如果队列是空的,那线程一定 Park。只有队列不空才会执行匹配,把原来在队列 Park 的线程唤醒
// 那这个跟 s 绑定的线程被唤醒时,head 很可能就是跟 s 匹配的 Node m 且 head.next 很可能就指向 s
// 可能是这样 head/m --> s --> x --> y
// 也可能是这样 head/m --> x --> s --> y 且 x 已经被 Cancel
// 也可能是 head --> x --> y 即 m 和 s 都已经出栈
// 所以这里只是尽力判断一下看 head 的 next 是不是 s 是的话尝试将 head 移动指向 s 的 next 即
// 让 head 和 s 都出队,个人认为如果这么写也是正确的:
// if ((h = head) != null && h == m)
// casHead(h, s.next);
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// 如果 mode 是 REQUEST 表示当前线程是 Consumer,则把与 s 匹配的节点 m 也即 Producer 的 item 返回
// 反之则当前线程是 Producer,就返回 Consumer 的 item 也即 null
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
// 如果 head Node 不空又是当前 mode 的 complementary mode 并且不正在 fulfilling,就先入栈一个新 Node,准备执行 fulfilling
else if (!isFulfilling(h.mode)) { // try to fulfill
// 如果 head 已经 cancel 则重头再来
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
// 入队新节点带着 fulfilling 标志,同一时刻队列最多只有一个 Node 是 fulfilling 状态且一定是 head
// 还有一个不变量是,队列除了 fulfilling Node 之外其它 Node 的 mode 全部相同
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
// 与 s 匹配的 Node 就是 s.next,s.next 可能为空,因为其它线程随时可能 Cancel 与自己绑定的 Node
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
// 已经没有能匹配的节点存在了,则尝试把 s 从栈上移除,这里 casHead 一定成功,因为 head 只能是 s
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// 如果 m 还存在,则尝试让 m 和 s 匹配,匹配成功则将 head 指向 m 的 next
SNode mn = m.next;
if (m.tryMatch(s)) {
// cas 一定成功,将 head 指向 m 的 next 即将从 s 到 m 的 Node 全部出栈
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 匹配失败,说明 m 已经 Cancel 了,则尝试去匹配 m 的 next,因为 m 和 mn 都是同一个 mode
s.casNext(m, mn); // help unlink
}
}
}
// 如果 head 的 mode 与当前线程操作的 mode 不同,本来应该是去 fulfiling head 的但 head 已经处在 fulfilling 状态
// 说明已经有一个线程正在 fulfilling head,所以当前线程就去帮帮它尽快 fulfilling head 也是避免这个线程 fulfilling
// 时候出现什么问题没有执行 fulfilling。过程与上面是一样的,只是操作节点从 s 换成了 head
else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
awaitFulfill
实现:
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins: maxUntimedSpins) : 0);
for (;;) {
// Park 返回可能是因为有线程调用了 Unpark,也可能是被中断,也可能是不明原因的就被唤醒了
// 循环结尾是 Park,所以每次 Park 返回都需要检查线程是否是因为被中断即 Cancel 而唤醒
if (w.isInterrupted())
// 如果线程被中断了即要 Cancel,tryCancel 会将 Node 的 match 原子的更新为自己
s.tryCancel();
// 读取 Node 的 match,只要是有值要么是 s 自己要么是与 s 的匹配 Node m 总之不为空就返回
SNode m = s.match;
if (m != null)
return m;
if (timed) {
// 如果有超时时间但已经过期了就自动 Cancel。如果是无限等待的 put,其 timed 会是 false
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 各种检查都过了,准备尝试等待,但为了避免 Producer 和 Consumer 跟得非常紧,先 Spin 几次
// 重复检查一下。如果 Producer 和 Consumer 跟得特别紧,能提高性能;跟得不紧则也没什么影响
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
// 如果 Node 的 waiter 是空,则设置 waiter 为当前线程,这块很精巧,
// 得配合 tryMatch 来看,主要是看到设置完 waiter 后不是立即 Park,而是再去执行
// 一遍检查,因为和 tryMatch 有个 Race Condition,后续记录
s.waiter = w; // establish waiter so can park next iter
else
// 如果 spin 过了,又有 waiter 就开始真的 Park
if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
上面说这个 awaitFulfill
跟 tryMatch
有竞争,先看看 tryMatch
。tryMatch
是 Node 的一个方法,给一个 Node 传入与其匹配的 Node 是这样:m.tryMatch(s)
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
// 当本 Node 的 match 为空且能成功将 match 从 null 修改为 s 的线程能走到这里
// 只有一个线程能走到这里。接着判断 waiter 是否为空,不空就 Unpark 这个 waiter 并设置 waiter 为 null
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
在 awaitFulfill
里如果设置了 waiter 后立即开始 Park,没有再 Loop 一遍去检查 Node 的 match 状态,那可能出现在执行 s.waiter = w
前,这个 Node 被 Fulfill,即另外一个线程执行了 tryMatch
并发现 Fulfill 节点的 waiter 是空,则完全不去执行 Unpark。那随后与 s 节点绑定的线程 Park 后会永久等待。
那为什么 tryMatch
不能不去判断 waiter 而不管怎么样都去调用 Unpark 呢?
因为 waiter 毕竟不是 Node 创建时就生成的,waiter 是可能为空的,即使 tryMatch
不做判断在 unpark 内也会检查 waiter 是否为空而看是否去执行 Unpark。并且主动判断 waiter 是否为空一方面更清晰,另一方面是需要在执行完 Unpark 后将 Node 与线程解绑,即设置 waiter 为空。不这么解绑也不行,你不知道链表有多长也不知道 Node 什么时候能被从链表移除,只要不移除就一直有个指针指向 waiter 线程是不合理的。尽快清理指针也有利于 GC。
为什么 waiter 不能在 Node 创建时候就设置呢?这样以来就没有这个竞争了,似乎更容易理解,tryMatch
里也可以不管怎么样直接 Unpark waiter。
我理解主要是这种实现并不好,还是得尽力避免 Node 上有遗留的指针乱指,以帮助 GC 减少内存占用。特别是 Dual-Stack 实现下理论上是有线程会永久饥饿的,它会有更自己关联的 Node 的指针,Node 又会指向整条链表,链表上每个 Node 如果 waiter 都不及时清理都指着各自关联的线程总体算下来可能会占用很多空间。摘录一个 comment 是:
While garbage collection takes care of most node reclamation issues that otherwise complicate nonblocking algorithms, care is taken to "forget" references to data, other nodes, and threads that might be held on to long-term by blocked threads
我理解就是如果没有用的指针不及时释放,当这些指针被某些阻塞的线程使用时候,积少成多可能会产生很多垃圾在内存,当垃圾甚至堆积到老代后收集起来会更费劲。
awaitFulfill
返回后,如果是 Node 被 Cancel 了,还需要将 Node 清理掉,在 clean
内实现:
// 现在要从链表上移除 Node s,先尽力清理 Node s 的指针
s.item = null; // forget item
s.waiter = null; // forget thread
// 因为是单向链表,现在要清理 s 这个 Node 但无法知道这个 Node 的前一个 Node 是什么
// 所以需要遍历链表去找到 s 的前一个 Node。但 s 这个 Node 可能会被别的线程清理,也就是
// 说我们遍历一遍链表也不一定能找到 s 不一定能找到 s 的前一个 Node。所以先设置一个 sentinel
// 即 s 的下一个节点。如果我们遍历链表遍历到了 s 的下一个 Node,则说明 s 已经被移除。当然
// 如果 s 和 s 的下一个 Node 都被移除队列那就只能遍历链表了,不再设置 s 的 next 的 next
// 为 sentinel
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 检查一下 head,如果 head 已经被 Cancel,就移除 head
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// 开始遍历链表了,从 head 开始要么遍历完整个链表,要么找到 sentinel,要么找到 s
while (p != null && p != past) {
SNode n = p.next;
// 发现被 cancel 的节点就尽力将其从链表移除,尽力维护链表让它短一点
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
TransferQueue
内部也是 transfer
method 最重要,跟 Stack 方式有很多相似之处,但也有很值得注意的区别,主要是 Queue 因为有 tail 的存在,其操作会比 Stack 简单。Stack 只有 head 指针,所以入栈、出栈都在一端,导致每次执行 fulfilling 都得先入栈一个新的特殊 Node 将 head 的 next 占住,让 head 指针暂时不要动,不要再入栈别的 Node,等 fulfilling 结束后再继续支持入栈和出栈。而对 Dual-Queue 来说因为有 head、tail 两个指针在,入队出队相互不影响,一个节点在执行 fulfilling 时候是从 head fulfilling,即使又有新的相同 mode 的 Node 来,也操作的是 tail,不会让 head 发生变化。
拿 Dual-Stack 来说,比如现在有三个 Producer A B C 在栈中:
PC --> PB --> PA -->
此时来了个 Consumer,假如不先入栈,直接像下面要说的 Dual-Queue 一样强行去匹配 PC,将 PC Node 内的 item 或者什么东西通过 CAS 操作做改变,能做成功这个 CAS 操作则表示 Consumer 和这个 Producer 匹配上了,则让 Producer 出栈。如果是这个方法,假设 Consumer 是 CX 在匹配 PC 时候不入栈新 Node 并且正在匹配的时候又有 Producer D 来了,变成:
PD --> PC --> PB --> PA -->
|
CX
即使 CX 操作成功,成功和 PC 匹配,将 PC 内的 item 设置为 null,那此时需要清理 PC,但因为 PC 没有指向 PD 的指针,只能遍历整个链表去找 PC,这个就很不合理了。Cancel 找整个链表还好说,一方面是 Cancel 操作相对较少,另一方面整个 Dual-Stack 实现中都尽全力的去减小链表长度。但这里说的在匹配 PC 时又有 PD 入栈在并发高的时候是完全可能的,链表可能会排的很长很难将 PC 出栈。
如果是 Dual-Queue 呢,还是上述场景,一开始是:
PA --> PB --> PC --> null
来了 Consumer X 要匹配 PA,并且在匹配时又来了新的 Producer D,变成:
PA --> PB --> PC --> PD -->
|
CX
不但 FIFO 的语义依然保持,在 CX 将 PA 匹配上后,只用将 head 指针往后挪一格就结束了。所以 fulfilling 的时候完全不用先入队一个 Node 去占位置。所以总的下来 Dual-Queue 的实现要简单的多。
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 我没看懂什么情况会出现 head 或 tail 没初始化的情况,因为 TransferQueue 的构造函数内会
// 将 head tail 都指向同一个空的 Node
// comment 里有对这个行为的解释:
// The loop starts off with a null check guarding against
// seeing uninitialized head or tail values. This never
// happens in current SynchronousQueue, but could if
// callers held non-volatile/final ref to the
// transferer. The check is here anyway because it places
// null checks at top of loop, which is usually faster
// than having them implicitly interspersed.
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
// 跟 Stack 一样,检查要不要等待,要的话就创建新 Node 并入队
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
// 入队完了开始等待,等待结束说明要么 Cancel 了要么匹配到 Node 了
// 与 Stack 不同的是,这里匹配成功返回的不是匹配的节点,是匹配节点的 item 如果匹配
// 节点的 item 是空,则表示当前是入队操作,匹配节点 item 非空则当前操作是出队
// 如果匹配不成功是当前操作被 Cancel 了,则返回 s 自己
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
// 看到 Queue 实现下 clean 要把 s 和 s 前面的 t 都传进去
clean(t, s);
return null;
}
// next 指向 Node 自己就表示 Node 已经出队
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
// 出队后要做一些清理
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
// 补充一下 Node 的成员,有 isData 用于表示本 Node 是入队还是出队
// Node 还有 item 成员,就是传递的元素,也被用来当做是否匹配成功的标志
}
// 发现 head 是处在 complementary-mode 于是需要将 head 出队
// 这里主要是要看到相对于 Stack 这里并不需要入队一个专用的 fulfilling Node
// 再有新 Node 来,如果跟 head 是互补的,那就一起跟当前线程争抢匹配 head
// 如果新 Node 跟 head 是相同 mode,那就用 tail 指针去执行入队
else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
// Queue 实现下,CAS Item 成功则表示匹配成功,只有一个线程能不进入这个 if
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
// 这里我个人认为是不是尽力把 waiter 设置为 null 更好一点
// 不过 Queue 不像 Stack,不会有线程持续饥饿,晚释放那么一会可能也还行
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
awaitFulfill
和 Stack 的实现基本一样,不写了。看 clean
,clean
是在 Node 被 Cancel,Unpark 后执行。
s.waiter = null; // forget thread
// clean 执行时候,s 是要删除的目标 Node,pred 是 s 的前一个 Node
// Queue 实现需要注意要留一个哑元在队列上,不能像 Stack 那样清理光,
// 所以只有一种情况我们不能将 s 清理,就是 s 刚好是 tail,清理的话队列就完全空了
// 所以当 s 真的是 tail 的时候,不做清理而是对 prev 做标记 cleanMe,标记它
// 说你的下一个 Node 需要被清理,等到它不是 tail 的时候请把它清理掉
// 因为被标记为 cleanMe 的 Node 在队列中只可能有一个,因为只有 tail 不能被清理
// 下一次再要清理节点时候,发现有 cleanMe 标记的 Node 时,要么能将现有的 s
// 清理掉,要么能将 cleanMe 指向的 next Node 清理,总之不能被清理的元素不可能越积越多
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
// 上面提到因为 tail 如果指向 s 则 s 不能被清理,所以这里要尽可能
// 读取到 tail 的最新值,尽力避免 tail 刚好指向 s 的情况,
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
// tail 不是 s 直接做清理
if (s != t) { // If not tail, try to unsplice
QNode sn =
if (sn == s || pred.casNext(s, sn))
return;
}
// cleanMe 是个 volatile 的引用指向上一次未清理的 Node 的 prev Node
QNode dp = cleanMe;
// 如果 cleanMe 非空,并且走到这里说明 s 是 tail,那之前 cleanMe 的 next
// 一定不是 tail 了,就去清理这个 cleanMe 的 next
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
// 有一堆条件需要判断,注释都写了原因了
// 需要知道的是,如果 Node 的 next 指向自己,则说明这个 Node 已经被清理
if (d == null || // d is gone or
d == dp || // d is off list or
// 这个稍后说一下
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
}
// 如果之前的 cleanMe 是空的,那现在因为 s 刚好是 tail,所以将 prev
// 标记为 cleanMe,等以后再来清理 s 即可
else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
上面需要注意的是那个清理 cleanMe 时候判断 !d.isCancelled()
。d 是 cleanMe 的下一个 Node,cleanMe 存在说明其下一个 Node 之前一定是调用了 clean,而能调用 clean 一定说明 d 这个 Node 已经被 Cancel 了。不然不可能调用 clean
。那为什么判断 !d.isCancelled()
呢?
主要是因为可能有多个线程一起在调用 clean
,一个线程读取了 cleanMe 后,其它线程可能也读取了同一个 cleanMe 并且已经将 cleanMe 的下一个 Node 清理掉了,那当前线程再读取刚才 cleanMe Node 的 next 拿到的可能就是个正常的 Node 不是 Cancel 的了。
需要关注的实现亮点
- 这东西这么复杂,他是咋么确定自己写的是对的
- 很神奇的队列,比较特别
特点总结
- 根本不算是队列
- 使用在很特殊的场合,Producer 必须和 Consumer 同步起来,传递数据