Java Queue

Introduction

Interface

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* A collection designed for holding elements prior to processing.
* Besides basic {@link java.util.Collection Collection} operations,
* queues provide additional insertion, extraction, and inspection
* operations. Each of these methods exists in two forms: one throws
* an exception if the operation fails, the other returns a special
* value (either {@code null} or {@code false}, depending on the
* operation). The latter form of the insert operation is designed
* specifically for use with capacity-restricted {@code Queue}
* implementations; in most implementations, insert operations cannot
* fail.
*/
public interface Queue<E> extends Collection<E> {
//插入(抛出异常)
boolean add(E e);
//插入(返回特殊值)
boolean offer(E e);
//移除(抛出异常)
E remove();
//移除(返回特殊值)
E poll();
//检查(抛出异常)
E element();
//检查(返回特殊值)
E peek();
}

Queue是个接口,它提供的add,offer方法初衷是希望子类能够禁止添加元素为null,这样可以避免在查询时返回null究竟是正确还是错误。实际上大多数Queue的实现类的确响应了Queue接口的规定,比如ArrayBlockingQueue,PriorityBlockingQueue等等。

但还是有一些实现类没有这样要求,比如LinkedList。虽然 LinkedList 没有禁止添加 null,但是一般情况下 Queue 的实现类都不允许添加 null 元素,为啥呢?因为poll(),peek()方法在异常的时候会返回 null,你添加了null 以后,当获取时不好分辨究竟是否正确返回。



``` java
public interface Deque<E> extends Queue<E> {
/**
* Inserts the specified element at the front of this deque if it is
* possible to do so immediately without violating capacity restrictions,
* throwing an {@code IllegalStateException} if no space is currently
* available. When using a capacity-restricted deque, it is generally
* preferable to use method {@link #offerFirst}.
*
* @param e the element to add
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this deque
* @throws NullPointerException if the specified element is null and this
* deque does not permit null elements
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this deque
*/
void addFirst(E e);


void addLast(E e);


E removeFirst();

E removeLast();
}

Classification

  • PriorityQueue
  • PriorityBlockingQueue
  • Deque
  • ArrayDeque:ArrayDeque是一个循环队列,它使用数组实现的Deque,底层是数组。
  • BlockingQueue
  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

Summary

  • 当 Deque 当做 Queue 队列使用时(FIFO),添加元素是添加到队尾,删除时删除的是头部元素。
  • 当 Deque 当做 Stack 栈用(LIFO)。这时入栈、出栈元素都是在双端队列的头部进行。
  • ArrayDeque不是线程安全的。 当作为栈使用时,性能比Stack好;当作为队列使用时,性能比LinkedList好。
  • ArrayBlockingQueue 底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。
  • LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。
  • SynchronousQueue 本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。
  • PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。

BlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;
// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

核心原理

ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。

如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后写线程唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。

构造函数可选参数:

  • 队列容量,其限制了队列中最多允许的元素个数;
  • 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
  • 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。

LinkedBlockingQueue

底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 队列容量
private final int capacity;

// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);

// 队头
private transient Node<E> head;

// 队尾
private transient Node<E> last;

// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();

// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();

// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

ReentrantLock与Condition使用说明:

  • takeLock 和 notEmpty 搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。

  • putLock 和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。


SynchronousQueue

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

虽然说是队列,但是 SynchronousQueue 的队列其实是虚的,其不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。

在开发中比较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

核心原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 写入值
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) { // 1
Thread.interrupted();
throw new InterruptedException();
}
}
// 读取值并移除
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0); // 2
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}

写操作 put(E o) 和读操作 take() 都是调用 Transferer.transfer(…) 方法,区别在于第一个参数是否为 null 值。

transfer 的设计思路,其基本算法如下:

  • 当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。

  • 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。

示例图说明

  1. 前提条件:公平模式,初始化情况下,TransferQueue的状态
  1. put线程1执行操作put(1),由于当前没有配对的消费线程,所以线程1入队列,自旋一小会后睡眠等待。
  1. put线程2执行操作put(2),由于当前没有配对的消费线程,所以线程2入队列,自旋一小会后睡眠等待。
  1. 这时候taker线程执行操作taker(1),由于tail指向put2线程,put线程2跟take线程1配对了(put一take),这时take1线程不需要入队,但是请注意了,这时候,要唤醒的线程并不是put线程2,而是put线程1。put线程1被唤醒,take线程1的take()方法返回了put线程1的数据,这样就实现了线程间的一对一通信,这时候内部状态如下:

公平策略总结下来就是:队尾匹配队头出队。

SynchronousQueue的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。


PriorityBlockingQueue

带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 构造方法中,如果不指定大小的话,默认大小为 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 这个就是存放数据的数组
private transient Object[] queue;

// 队列当前大小
private transient int size;

// 大小比较器,如果按照自然序排序,那么此属性可设置为 null
private transient Comparator<? super E> comparator;

// 并发控制所用的锁,所有的 public 且涉及到线程安全的方法,都必须先获取到这个锁
private final ReentrantLock lock;

// 这个很好理解,其实例由上面的 lock 属性创建
private final Condition notEmpty;

// 这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;

// 用于序列化和反序列化的时候用,对于 PriorityBlockingQueue 我们应该比较少使用到序列化
private PriorityQueue q;

此类实现了 Collection 和 Iterator 接口中的所有接口方法,对其对象进行迭代并遍历时,不能保证有序性。

如果你想要实现有序遍历,建议采用 Arrays.sort(queue.toArray()) 进行处理。PriorityBlockingQueue 提供了 drainTo 方法用于将部分或全部元素有序地填充(准确说是转移,会删除原队列中的元素)到另一个集合中。

还有一个需要说明的是,如果两个对象的优先级相同(compare 方法返回 0),此队列并不保证它们之间的顺序。

PriorityBlockingQueue 使用了基于数组的二叉堆来存放元素,所有的 public 方法采用同一个 lock 进行并发控制。


Reference