实现细节
内部使用数组实现的 Ring Buffer 存储 item 的引用。最重要的是 takeIndex, putIndex, count 三个成员:
//** The queued items *//
final Object[] items;
//** items index for next take, poll, peek or remove *//
int takeIndex;
//** items index for next put, offer, or add *//
int putIndex;
//** Number of elements in the queue *//
int count;
为了做到线程安全,上述三个成员访问过程都需要加锁。内部通过 notFull, notEmpty 两个 Condition 来实现状态等待。这里几乎是 Condition 使用的最标准案例,还是值得一看的。因为 Condition 的存在,将使用同一个锁但等在不同状态的线程分开,从而可以精准的去唤醒目标线程。比如这里如果用 Java 原生的 wait(),notify() 的话,就可能导致队列不满时本来只用唤醒 Producer 就行了,但 notify() 一下把 Consumer 唤醒了。
还要注意到这三个变量没有 padding,在好些地方看到说 ArrayBlockingQueue 的一个问题是没有 padding 导致会 False Sharing。但个人认为这个帽子似乎扣错了。
假若队列实现是队首队尾两个锁,或者根本没有锁靠 CAS 来设置队首队尾的 index 或指针,如果没有 padding 存在假设像 ArrayBlockingQueue 里 takeIndex, putIndex, count 这样,那 putIndex、takeIndex、count 三个值可能会在 Producer,Consumer 还有别的访问 count 的线程之间跳跃,相互影响出现 False Sharing 问题。但是对于现在 ArrayBlockingQueue 的实现来说,它这三个成员都是用一个大锁已经保护起来了,锁本身就有每次访问需要重新取主存的问题,所以对于 ArrayBlockingQueue 来说就不用 padding 了。
阻塞入队:
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列满了则等待在 notFull 状态下,需要注意被唤醒后需要再检查 notFull 状态是否真的满足
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
enqueue :
// 从数组上根据 putIndex 找到队列头,塞入新元素
final Object[] items = this.items;
items[putIndex] = x;
// putIndex 到达数组末尾需要 wrap
if (++putIndex == items.length)
putIndex = 0;
count++;
// 有元素后通知 notEmpty 状态
notEmpty.signal();
这里有个可能跟直观想象不一样的地方,在调用 notEmpty.signal() 时候线程必须抱着锁调用,直观想象可能会觉得这里将等待在 notEmpty 的线程唤醒,但这个线程被唤醒后第一步是抢锁,可是这个锁又正在被调用 notEmpty.signal() 的线程占用。如果有这个问题的话得查看 AbstractQueueSynchronizer 的实现,实际是 Condition 的 signal() 只是找到等待在 Condition 上的线程,找到下一个被唤醒的线程后就结束了,得在 unlock 时候才会真的把这个线程唤醒去抢锁。
阻塞出队:
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 等待 notEmpty 状态,被唤醒后需要重新检查 notEmpty 状态
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
dequeue:
// 根据 takeIndex 找到队尾元素
final Object[] items = this.items;
// 出队的元素类型是 E,而内部数组类型是 Object 需要转换
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 主动将引用置空
items[takeIndex] = null;
// index 需要 wrap
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 通知 notFull 状态
notFull.signal();
return x;
获取队列长度,主要是关注到需要拿队列锁,会影响到出入队性能:
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
这里补充一个细节。JDK 各种库里经常能看到上面这种写法,先将一个当前 method 不会变化的成员赋值给一个本地变量,后续再操作或者访问这个本地变量,不再直接使用成员变量。比如上面获取队列长度的代码也可以写成:
this.lock.lock();
try {
return count;
} finally {
this.lock.unlock();
}
还能短一行。不这么写主要是因为 ArrayBlockingQueue 里的写法能再访问不变的成员变量时候更快一点。拿反编译的上面代码来看是这样,截取一部分:
Code:
0: aload_0 // 读 this
1: getfield #20 // 读取 this.lock
4: astore_1 // 存给本地变量 1
5: aload_1 // 读取本地变量 1
6: invokevirtual #22 // 调用 lock()
能看到 this.lock 至少要两步,aload_0 再 getfield 一共消耗 4 个字节,而访问本地变量就一个 aload_1 就结束了。aload_1 也更快。
不过这只是从字节码上来说,个人认为也不一定非要这么吹毛求疵,实际运行起来也不见得 JDK 的写法能快多少。按说 JVM 配合 JIT 有各种优化在,对于长期运行的程序靠这种写法不该会有太多改进。当然这块看看也好。这种写法主要出现在:在一段 method 内某成员变量被访问多次且不会发生变化的时候。
需要关注的实现亮点
- 使用 Condition 而不是 java 原生的 wait(), signal() 来实现等待一个状态,漂亮很多;
- ArrayBlockingQueue 的 Iterator 实现的很复杂,有机会可以看看
特点总结
- 内部只有一个锁,入队、出队、获取队列长度都需要争抢同一个锁,并发时性能不高;