28-阻塞队列LinkedBlockingQueue
April 3, 2025About 4 min
| 版本 | 内容 | 时间 | 
|---|---|---|
| V1 | 新建 | 2022年12月04日16:19:38 | 
LinkedBlockingQueue概述
LinkedBlockingQueue 是基于链表的无界阻塞队列,FIFO。
LinkedBlockingQueue 和 ArrayBlockingQueue 一样都是基于 ReentrantLock 做线程同步的,它们的区别是:
- ArrayBlockingQueue 内部只有一个锁对象,这个锁对象同时控制队列的入队和出队;
- LinkedBlockingQueue 有两个锁对象,分别控制着入队和出队;
节点对象
LinkedBlockingQueue 是基于链表实现的,Node 类就一个元素对象和指针。
// 节点对象
static class Node<E> {
    E item;
    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;
    Node(E x) { item = x; }
}LinkedBlockingQueue 成员属性
// 队列的容量
private final int capacity;
/** Current number of elements */
// 元素个数
private final AtomicInteger count = new AtomicInteger();
/**
 * Head of linked list.
 * Invariant: head.item == null
 */
// 头节点指针 head.item == null
transient Node<E> head;
/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
// 尾结点指针 last.next == null
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
// 队列空时,出队线程在该条件队列等待
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
// 队列满时,入队线程在该条件队列等待
private final Condition notFull = putLock.newCondition();| 属性 | 说明 | 
|---|---|
| int capacity | 队列的容量,默认是 Integer.MAX_VALUE | 
| AtomicInteger count | 队列内元素的个数,使用原子类来存储,因为LinkedBlockingQueue 里面入队何处对是不同的锁对象控制。 | 
| Node<E> head | 指向队列的头指针,不存储数据, head.item == null | 
| Node<E> last | 指向队列尾结点的指针, last.next == null | 
| ReentrantLock takeLock | 出队的锁对象 | 
| Condition notEmpty  | 队列空时,出队线程在该条件队列等待 | 
| ReentrantLock putLock | 入队的锁对象 | 
| Condition notFull | 队列满时,入队线程在该条件队列等待 | 
LinkedBlockingQueue 构造函数
可以看到,LinkedBlockingQueue 默认的队列的容量是 Integer.MAX_VALUE。
初始情况下,新建一个 item 为 null 的 Node 节点,head 节点和 last 节点都指向这个节点。
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 创建头尾节点的指针
    last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}核心方法
阻塞入队 put
流程:
- 创建 Node 对象封装元素;
- 尝试获取锁对象;
- 当队列满了的时候,当前线程就在 notFull 挂起等待。否则调用 LinkedBlockingQueue#enqueue 方法入队;
- 增加计数,假如增加计数后队列还未满,则需要唤醒可能在 notFull 上阻塞的线程(入队线程);
- 最后假如 c == 0,说明插入该节点前队列是空的,需要尝试唤醒一个在 notEmpty 上阻塞的线程(出队线程);
// 入队尾
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取锁,响应中断
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 等待消费线程唤醒
            notFull.await();
        }
        enqueue(node);
        // 增加计数
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}阻塞出队 take
流程:
- 尝试获取出队的锁对象;
- 假如队列是空的,那么就需要当前线程在 notEmpty 上面等待;
- 假如队列不是空的,那么就调用 LinkedBlockingQueue#dequeue 方法进行出队操作;
- 减少计数;
- 假如 c > 1,说明移除元素后队列中还有元素,需要唤醒因为没有元素而等待的线程;
- 最后,假如c == capacity,说明出队之前队列是满的,此次将队列由满变为不满了,需要尝试唤醒阻塞的入队线程;
/**
 * 获取队首元素,无限等待
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取锁,响应中断
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            // 进入这里说明队列没有元素,需要等待元素入队,无限等待
            notEmpty.await();
        }
        // 出队
        x = dequeue();
        // 计数减少
        c = count.getAndDecrement();
        // c > 1 说明当前移除元素不是队列中的最后一个元素,需要唤醒因为没有元素而等待的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // c == capacity 说明出队之前队列是满的,此次将队列由满变为不满了,
    if (c == capacity)
        signalNotFull();
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}小结
- LinkedBlockingQueue 是基于链表的阻塞队列;
- LinkedBlockingQueue 类似无界队列,默认容量是Integer.MAX_VALUE;
- LinkedBlockingQueue 使用两个独立的 ReentrantLock 对象,分别保证出队和入队操作线程安全,这样就可以同时入队和出队了;
- LinkedBlockingQueue 的 ReentrantLock 都是默认的非公平策略,不可指定;
Contributors
Dylan Kwok