Introduction
Interface
1 | /** |
Classification
- PriorityQueue
- PriorityBlockingQueue
- Deque
- ArrayDeque:ArrayDeque是一个循环队列,它使用数组实现的Deque,底层是数组。
- BlockingQueue
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
Summary
- 当 Deque 当做 Queue 队列使用时(FIFO),添加元素是添加到队尾,删除时删除的是头部元素。
- 当 Deque 当做 Stack 栈用(LIFO)。这时入栈、出栈元素都是在双端队列的头部进行。
- ArrayDeque不是线程安全的。 当作为栈使用时,性能比Stack好;当作为队列使用时,性能比LinkedList好。
- ArrayBlockingQueue 底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。
- LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。
- SynchronousQueue 本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。
- PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。
BlockingQueue
ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。
核心属性
1 | // 用于存放元素的数组 |
核心原理
ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。
如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后写线程唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
构造函数可选参数:
- 队列容量,其限制了队列中最多允许的元素个数;
- 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
- 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。
LinkedBlockingQueue
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
核心属性
1 | // 队列容量 |
ReentrantLock与Condition使用说明:
takeLock 和 notEmpty 搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
putLock 和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。
SynchronousQueue
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
虽然说是队列,但是 SynchronousQueue 的队列其实是虚的,其不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
在开发中比较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
核心原理
1 | // 写入值 |
写操作 put(E o) 和读操作 take() 都是调用 Transferer.transfer(…) 方法,区别在于第一个参数是否为 null 值。
transfer 的设计思路,其基本算法如下:
当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。
如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。
示例图说明
- 前提条件:公平模式,初始化情况下,TransferQueue的状态
- put线程1执行操作put(1),由于当前没有配对的消费线程,所以线程1入队列,自旋一小会后睡眠等待。
- put线程2执行操作put(2),由于当前没有配对的消费线程,所以线程2入队列,自旋一小会后睡眠等待。
- 这时候taker线程执行操作taker(1),由于tail指向put2线程,put线程2跟take线程1配对了(put一take),这时take1线程不需要入队,但是请注意了,这时候,要唤醒的线程并不是put线程2,而是put线程1。put线程1被唤醒,take线程1的take()方法返回了put线程1的数据,这样就实现了线程间的一对一通信,这时候内部状态如下:
公平策略总结下来就是:队尾匹配队头出队。
SynchronousQueue的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。
PriorityBlockingQueue
带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
核心属性
1 | // 构造方法中,如果不指定大小的话,默认大小为 11 |
此类实现了 Collection 和 Iterator 接口中的所有接口方法,对其对象进行迭代并遍历时,不能保证有序性。
如果你想要实现有序遍历,建议采用 Arrays.sort(queue.toArray()) 进行处理。PriorityBlockingQueue 提供了 drainTo 方法用于将部分或全部元素有序地填充(准确说是转移,会删除原队列中的元素)到另一个集合中。
还有一个需要说明的是,如果两个对象的优先级相同(compare 方法返回 0),此队列并不保证它们之间的顺序。
PriorityBlockingQueue 使用了基于数组的二叉堆来存放元素,所有的 public 方法采用同一个 lock 进行并发控制。