实现细节

内部是链表,有这些基本成员:

//** The capacity bound, or Integer.MAX_VALUE if none *//
private final int capacity;

//** Current number of elements *//
private final AtomicInteger count = new AtomicInteger();

//**/
/ * Head of linked list./
/ * Invariant: head.item == null/
/ *//
transient Node<E> head;

//**/
/ * Tail of linked list./
/ * Invariant: last.next == null/
/ *//
private transient Node<E> last;
  1. capacity 用于实现 Bounded Queue,是 final 的变量所以天生线程安全。
  2. count 是 Atomic 的,从而避免在获取队列长度时候跟 Producer、Consumer 抢锁。
  3. 链表首尾有两个指针,分别被两个锁保护,从而实现了锁的分离,减少了锁争抢。

与 ArrayBlockingQueue 相似,也是有 notEmpty, notFull 两个 Condition 用于等待这两个状态。

//** Lock held by take, poll, etc *//
private final ReentrantLock takeLock = new ReentrantLock();

//** Wait queue for waiting takes *//
private final Condition notEmpty = takeLock.newCondition();

//** Lock held by put, offer, etc *//
private final ReentrantLock putLock = new ReentrantLock();

//** Wait queue for waiting puts *//
private final Condition notFull = putLock.newCondition();

初始化:

last = head = new Node<E>(null);

主要是看到 last 和 head 一开始都指向一个哑元。

阻塞入队:

int c = -1;
// 每次入队新元素都要 new 新的 Node
// 如果是非阻塞入队 offer 会在判断确认队列有空位之后 new 新 Node
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
    // 循环等待 notFull 状态
    while (count.get() == capacity) {
        notFull.await();
    }
    enqueue(node);
    // count 加一并且在确认加一后依然不满时通知 notFull 状态
    // 因为可能有多个 Producer 处在 notFull.await() 状态而 signalNotFull()
    // 一次只会唤醒一个线程,所以当前被唤醒的 Producer 在执行完入队且发现队列
    // 依然有空位时需要唤醒下一个 Producer 避免其它 Producer 永久阻塞
    c = count.getAndIncrement();
    if (c + 1 < capacity)
        // 带着 putLock 锁去做这个通知
        notFull.signal();
} finally {
    putLock.unlock();
}
// 如果入队元素前队列为空则这里通知 notEmpty 状态
// signalNotEmpty() 内会抢 takeLock,所以不在上面的 try finally 里做
if (c == 0)
    signalNotEmpty();

提一个看到这里需要想到的点,队列有两个锁以提高并发性能,但是有多个锁的时候会引入一个很常见的问题就是死锁。上面 signalNotEmpty() 调用是在释放了 putLock 之后,原因不光是性能(释放 putLock 后其他 Producer 才能工作),还为了避免死锁,不然 Producer, Consumer 加锁顺序不同一定会产生死锁。

另外可以看看 LinkedBlockQueue 内的 fullyLock 实现,在需要同时获取 putLock, takeLock 时专门将加锁,释放锁两个 method 提出来,从而保证真的要加两个锁时候一定要按顺序加锁,按顺序释放锁。

上面代码看着挺简单,有个地方得单独说一下:

while (count.get() == capacity) {
    notFull.await();
}

一般在检查 Condition 时候,需要确保被检查的条件在拿到锁之后不会变化。比如一般范式:

lock();
try {
  while (check == false) {
    condition.await();
  }
  // do stuff
} finally {
  unlock();
}

这里 while 检查的含义是说要保证在执行 do stuff 时候 check 必须是 true,如果不是则需要 await()。当 check 变成 true 后 condition 会收到通知线程被唤醒,唤醒时 check 可能是 true 的但线程被唤醒并加上锁后,可能 check 又变成 false 了,所以需要用 while 去做检查。这里有个关键点是,check 这个变量必须被锁保护起来,不能再 condition.await() 返回加上锁后再变化。不然如果 check 没有被这个锁保护,那 check 检查完发现是 true,跳出 while 可刚跳出 while 后 check 变化了导致跳出 while 后 check 是 false 就不行了。

回到 LinkedBlockingQueue 的代码里看到它的检查条件是 count.get() == capacity 但 count 是个 Atomic,随时可能发生变化,并没有被锁保护起来。但这里因为只需要保证队列不满即可,即使有消费者不断在消费导致 count 递减 count.get() == capacity 的条件也不会成立。因为有锁的存在,await() 返回后一定加上了 putLock,所以 count 只可能减少不会增加,所以 count.get() == capacity 的条件在跳出 while 后依然不会成立。

enqueue:

// 将新元素添加到 last 指针,并更新 last
last = last.next = node;

阻塞出队:

E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
    // 跟入队一样 count 没有被锁保护但 await() 出来后 count
    // 只能增加不会减少,所以 count.get() == 0 的条件不会再成立
    while (count.get() == 0) {
        notEmpty.await();
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
        // 队列依然不空需要唤醒下一个 Consumer 线程
        notEmpty.signal();
} finally {
    takeLock.unlock();
}
// 队列不满后通知 notFull 状态唤醒 Producer 这里会抢 putLock
if (c == capacity)
    signalNotFull();

dequeue:

// head 指向哑元实际是指向上一个出队的 Node,本次出队需要将 head.next 出队
Node<E> h = head;
Node<E> first = h.next;
// 将 head 的 next 指向 head 废弃 head 指向的哑元,让 GC 将其销毁
h.next = h; // help GC
// head 指向当前出队的 Node,将其当做新的哑元
head = first;
// 将 head 指向的 item 返回,并将 head 指向的哑元的 item 设置为 null
// 从而队列不再跟已经出队的元素相关联
E x = first.item;
first.item = null;
return x

需要关注的实现亮点

  1. 链表实现值得参考看看
  2. 各种 condition 的通知值得看看

特点总结

  1. 队列最关键的入队出队指针被两个锁分别保护起来,并且 count 是 Atomic 的所以最优情况下并发性能相对 ArrayBlockingQueue 更好,最大吞吐量极限更高;
  2. 更适用于可能一般不长但能堆积很长的队列,因为不用预先按队列最大长度占用内存;
  3. 每次入队都要创建单独的 Node,会有额外 GC 开销;

在 LinkedBlockingQueue 的注释里有这么一段话:

Linked queues typically have higher throughput than array-based queues but
less predictable performance in most concurrent applications.

吞吐量更大还好说,为什么说是 less predictable performance in most concurrent applications 呢?

搜了一大圈也没有找到作者对这句话的官方解释。个人理解是这样:

  1. LinkedBlockingQueue 并发性能比 ArrayBlockingQueue 更优有个前提条件,就是 Producer, Consumer 不能相互影响。但不相互影响的前提从前面代码看,就是比如 Producer 入队时队列不能是空的,如果是空的则会触发 signalNotEmpty() 的执行,会去抢 takeLock 给 Consumer 发通知,导致跟 Consumer 争抢锁。同理 Consumer 是队列不能满,否则也会去和 Producer 抢锁。但实际大部分使用场景,队列大部分时间要么处于空的要么处于满的,很难处在一个中间状态,Producer 和 Consumer 刚好生产消费速度匹配。所以 LinkedBlockingQueue 即使有两个锁实际使用时候也会有 Producer Consumer 相互抢锁的情况,导致最优情况下吞吐量虽然高,但正常使用不一定高或者 less predicatable;
  2. LinkedBlockingQueue 因为需要创建 Node,总是相对来说多一点开销,如果队列较长可能会更容易让 Node 进入内存 Old Region 导致 GC 负担增加。特别是 LinkedBlockingQueue 本身是无界的,队列可能会非常长导致 GC 问题更大;