juc · 2019-09-20 0

Java阻塞队列BlockingQueue

一、阻塞队列

阻塞队列,首先是一个队列。当阻塞队列是空时,从队列中获取元素的操作将会被阻塞;当阻塞队列是满时,往队列里添加元素的操作将会被阻塞

在多线程领域,所谓阻塞,在某些情况下挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒,我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程

二、BlockingQueue核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

抛出异常:当阻塞队列满时,再往队列里add插入元素会抛出 IllegalStateException:Queue fll;当阻塞队列空时,再往队列里remove移除元素会抛出 NoSuchElementExeception

特殊值:插入方法,成功 true,失败 false;移除方法,成功返回队列的元素,队列里没有就返回 null

阻塞:当阻塞队列满时,线程往队列里put元素,队列会一直阻塞直到线程能put数据;当阻塞队列空时,线程从队列take元素,队列会一直阻塞直到可take元素。注意,此时线程是 WAITING 或 TIMED_WAITING 状态

超时退出:当阻塞队列满时,队列会阻塞线程一定时间,超过限时后线程会退出

三、种类

ArrayBlockingQueue:由数组结构组成的有界阻塞队列

LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列

PriorityBlockingQueue:支持优先级排序的无界阻塞队列

DelayQueue:(延迟队列)使用优先级队列实现的延迟无界阻塞队列

SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列

LinkedTransferQueue:由链表结构组成的无界阻塞队列

LinkedBlockingDeque:由链表结构组成的双向阻塞队列

四、原理

1.ArrayBlockingQueue

数据放在 items 中

// ArrayBlockingQueue.java

final Object[] items;

int takeIndex;

int putIndex;

int count;

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

2.PriorityBlockingQueue

// PriorityBlockingQueue.java
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

3.DelayQueue

延迟队列

// DelayQueue.java
public void put(E e) {
    offer(e);
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

五、示例

1.使用 SynchronousQueue

SynchronousQueue是单个元素队列

生产线程put一个对象时,再put就会阻塞;消费者take一个对象后,再take就会阻塞

@Test
public void testSynchronousQueue() throws InterruptedException {
    SynchronousQueue<String> queue = new SynchronousQueue<>();
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName() + "t put 1");
            queue.put("1");
            System.out.println(Thread.currentThread().getName() + "t put 2");
            queue.put("2");
            System.out.println(Thread.currentThread().getName() + "t put 3");
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "producer").start();

    new Thread(()->{
        try {
            TimeUnit.MILLISECONDS.sleep(200);
            System.out.println(Thread.currentThread().getName() + "t take " + queue.take());
            TimeUnit.MILLISECONDS.sleep(200);
            System.out.println(Thread.currentThread().getName() + "t take " + queue.take());
            TimeUnit.MILLISECONDS.sleep(200);
            System.out.println(Thread.currentThread().getName() + "t take " + queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "consumer").start();

    TimeUnit.SECONDS.sleep(1);
}

2.使用 DelayQueue

public class DelayedTask implements Delayed {

    private String name;

    private long availableTime;

    public DelayedTask(String name, long delayTime) {
        this.name = name;
        // available = 当前时间 + delayTime
        this.availableTime = delayTime + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // 判断 available 是否大于当前系统时间,并将结果转换成 MILLISECONDS
        long diffTime = availableTime - System.currentTimeMillis();
        return unit.convert(diffTime, TimeUnit.MILLISECONDS);
    }

    public long getAvailableTime() {
        return this.availableTime;
    }

    @Override
    public int compareTo(Delayed o) {
        // compareTo 用在 DelayedTask 的排序
        return (int) (this.availableTime - ((DelayedTask) o).getAvailableTime());
    }

    @Override
    public String toString() {
        return "DelayedTask{" +
                "name='" + name + ''' +
                ", availableTime=" + availableTime +
                '}';
    }
}
@Test
public void testDelayQueue() throws InterruptedException {
    DelayQueue<DelayedTask> queue = new DelayQueue<>();
    queue.offer(new DelayedTask("delayTest1", 700));
    queue.offer(new DelayedTask("delayTest2", 600));
    queue.offer(new DelayedTask("delayTest3", 500));

    // 可以查询到队首数据
    System.out.println(queue.peek());
    // 未到延迟时间,出队列的元素为空
    System.out.println(queue.poll());

    // 等待延迟时间过后,出队列就会数据
    // TimeUnit.SECONDS.sleep(2);
    // System.out.println(queue.poll());
    // System.out.println(queue.poll());
    // System.out.println(queue.poll());

    // take 方法,会执行阻塞,直到过了延迟时间,数据可以出队列
    System.out.println(queue.take());
    System.out.println(queue.take());
    System.out.println(queue.take());
}