LinkedBlockingQueue详解

LinkedBlockingQueue介绍【1】LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大 , 所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存 , 则队列将抛出OOM错误 。所以为了避免队列过大造成机器负载或者内存爆满的情况出现 , 我们在使用的时候建议手动传一个队列的大小 。
【2】LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素 。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行 。
LinkedBlockingQueue使用//指定队列的大小创建有界队列BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);//无界队列BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();LinkedBlockingQueue的源码分析【1】属性值
// 容量,指定容量就是有界队列private final int capacity;// 元素数量,用原子操作类的原因在于有两个线程都会操作需要保证可见性private final AtomicInteger count = new AtomicInteger();// 链表头本身是不存储任何元素的,初始化时item指向nulltransient Node<E> head;// 链表尾private transient Node<E> last;// take锁锁分离 , 提高效率private final ReentrantLock takeLock = new ReentrantLock();// notEmpty条件// 当队列无元素时 , take锁会阻塞在notEmpty条件上 , 等待其它线程唤醒private final Condition notEmpty = takeLock.newCondition();// put锁private final ReentrantLock putLock = new ReentrantLock();// notFull条件// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒private final Condition notFull = putLock.newCondition();//典型的单链表结构static class Node<E> {E item;//存储元素Node<E> next;//后继节点单链表结构Node(E x) { item = x; }}【2】构造函数
public LinkedBlockingQueue() {// 如果没传容量,就使用最大int值初始化其容量this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;// 初始化head和last指针为空值节点last = head = new Node<E>(null);}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // 为保证可见性而加的锁try {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}【3】核心方法分析
1)入队put方法
public void put(E e) throws InterruptedException {// 不允许null元素if (e == null) throw new NullPointerException();int c = -1;// 新建一个节点Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 使用put锁加锁putLock.lockInterruptibly();try {// 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)while (count.get() == capacity) {notFull.await();}// 队列不满 , 就入队enqueue(node);c = count.getAndIncrement();// 队列长度加1,返回原值// 如果现队列长度小于容量,notFull条件队列转同步队列 , 准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)// 这里为啥要唤醒一下呢?// 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock(); // 真正唤醒生产者线程}// 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程if (c == 0)signalNotEmpty();}private void enqueue(Node<E> node) {// 直接加到last后面,last指向入队元素last = last.next = node;}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();// 加take锁try {notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程} finally {takeLock.unlock();// 真正唤醒消费者线程}}2)出队take方法
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 使用takeLock加锁takeLock.lockInterruptibly();try {// 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)while (count.get() == 0) {notEmpty.await();}// 否则,出队x = dequeue();c = count.getAndDecrement();//长度-1,返回原值if (c > 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理notEmpty.signal();} finally {takeLock.unlock(); // 真正唤醒消费者线程}// 为什么队列是满的才唤醒阻塞在notFull上的线程呢?// 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,// 这也是锁分离带来的代价// 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程if (c == capacity)signalNotFull();return x;}private E dequeue() {// head节点本身是不存储任何元素的// 这里把head删除 , 并把head下一个节点作为新的值// 并把其值置空,返回原来的值Node<E> h = head;Node<E> first = h.next;h.next = h; // 方便GChead = first;E x = first.item;first.item = null;return x;}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程} finally {putLock.unlock(); // 解锁 , 这才会真正的唤醒生产者线程}}

推荐阅读