一、阻塞队列
阻塞队列,首先是一个队列。当阻塞队列是空时,从队列中获取元素的操作将会被阻塞;当阻塞队列是满时,往队列里添加元素的操作将会被阻塞
在多线程领域,所谓阻塞,在某些情况下挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒,我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程
二、BlockingQueue核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
抛出异常:当阻塞队列满时,再往队列里add插入元素会抛出 IllegalStateException:Queue fll;当阻塞队列空时,再往队列里remove移除元素会抛出 NoSuchElementExeception
特殊值:插入方法,成功 true,失败 false;移除方法,成功返回队列的元素,队列里没有就返回 null
阻塞:当阻塞队列满时,线程往队列里put元素,队列会一直阻塞直到线程能put数据;当阻塞队列空时,线程从队列take元素,队列会一直阻塞直到可take元素。注意,此时线程是 WAITING 或 TIMED_WAITING 状态
超时退出:当阻塞队列满时,队列会阻塞线程一定时间,超过限时后线程会退出
三、种类
ArrayBlockingQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
DelayQueue:(延迟队列)使用优先级队列实现的延迟无界阻塞队列
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
LinkedTransferQueue:由链表结构组成的无界阻塞队列
LinkedBlockingDeque:由链表结构组成的双向阻塞队列
四、原理
1.ArrayBlockingQueue
数据放在 items 中
// ArrayBlockingQueue.java
final Object[] items;
int takeIndex;
int putIndex;
int count;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
2.PriorityBlockingQueue
// PriorityBlockingQueue.java
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
3.DelayQueue
延迟队列
// DelayQueue.java
public void put(E e) {
offer(e);
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
五、示例
1.使用 SynchronousQueue
SynchronousQueue是单个元素队列
生产线程put一个对象时,再put就会阻塞;消费者take一个对象后,再take就会阻塞
@Test
public void testSynchronousQueue() throws InterruptedException {
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "t put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName() + "t put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName() + "t put 3");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "producer").start();
new Thread(()->{
try {
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "t take " + queue.take());
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "t take " + queue.take());
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "t take " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consumer").start();
TimeUnit.SECONDS.sleep(1);
}
2.使用 DelayQueue
public class DelayedTask implements Delayed {
private String name;
private long availableTime;
public DelayedTask(String name, long delayTime) {
this.name = name;
// available = 当前时间 + delayTime
this.availableTime = delayTime + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
// 判断 available 是否大于当前系统时间,并将结果转换成 MILLISECONDS
long diffTime = availableTime - System.currentTimeMillis();
return unit.convert(diffTime, TimeUnit.MILLISECONDS);
}
public long getAvailableTime() {
return this.availableTime;
}
@Override
public int compareTo(Delayed o) {
// compareTo 用在 DelayedTask 的排序
return (int) (this.availableTime - ((DelayedTask) o).getAvailableTime());
}
@Override
public String toString() {
return "DelayedTask{" +
"name='" + name + ''' +
", availableTime=" + availableTime +
'}';
}
}
@Test
public void testDelayQueue() throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.offer(new DelayedTask("delayTest1", 700));
queue.offer(new DelayedTask("delayTest2", 600));
queue.offer(new DelayedTask("delayTest3", 500));
// 可以查询到队首数据
System.out.println(queue.peek());
// 未到延迟时间,出队列的元素为空
System.out.println(queue.poll());
// 等待延迟时间过后,出队列就会数据
// TimeUnit.SECONDS.sleep(2);
// System.out.println(queue.poll());
// System.out.println(queue.poll());
// System.out.println(queue.poll());
// take 方法,会执行阻塞,直到过了延迟时间,数据可以出队列
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}