juc · 2019-09-20 0

Java三种方式(synchronized、Lock、BlockingQueue)解决生产者消费者问题

一、synchronized

使用synchronized互斥访问生产方法(produce)和消费方法(consume)

如果仓库满,就调用wait()方法阻塞生产者线程;如果仓库空,就调用wait()方法阻塞消费者线程

调用notifyAll()方法唤醒线程

interface Storage{
    public void produce() throws InterruptedException;
    public void consume() throws InterruptedException;
}

class SyncStorage implements Storage {
    //    仓库容量
    private final int MAX_SIZE= 2;
    //    仓库存储载体
    private Queue<Character> queue = new LinkedList<>();

    @Override
    public void produce() throws InterruptedException {
        synchronized (queue){
            while (queue.size() >= MAX_SIZE){
                queue.wait();
            }
//            生产随机生成的字母
            char temp = (char)(int)(Math.random()*26+97);
            queue.add(temp);
            System.out.println(Thread.currentThread().getName() + "t 生产:" +temp + "t 现库存:" + queue.size());
            queue.notifyAll();
        }
    }

    @Override
    public void consume() throws InterruptedException {
        synchronized (queue){
            while (queue.size() == 0){
                queue.wait();
            }
            char temp = queue.remove();
            System.out.println(Thread.currentThread().getName() + "t 消费:" +temp + "t 现库存:" + queue.size());
            queue.notifyAll();
        }
    }
}

二、Lock

使用Lock和Condition 实现生产者消费者

class LockStorage implements Storage{
//    仓库容量
    private final int MAX_SIZE= 2;
//    仓库存储载体
    private Queue<Character> queue = new LinkedList<>();
//    锁
    private Lock lock = new ReentrantLock();
//    仓库满的条件
    private Condition full = lock.newCondition();
//    仓库空的条件
    private Condition empty = lock.newCondition();

    @Override
    public void produce() throws InterruptedException {
        lock.lock();
        try{
            while (queue.size() >= MAX_SIZE){
                full.await();
            }
            //            生产随机生成的字母
            char temp = (char)(int)(Math.random()*26+97);
            queue.add(temp);
            System.out.println(Thread.currentThread().getName() + "t 生产:" +temp + "t 现库存:" + queue.size());
            empty.signalAll();
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0){
                empty.await();
            }
            char temp = queue.remove();
            System.out.println(Thread.currentThread().getName() + "t 消费:" +temp + "t 现库存:" + queue.size());
            full.signalAll();
        }finally {
            lock.unlock();
        }
    }
}

三、BlockingQueue

用BlockingQueue实现生产者消费者

put()方法,入队列,如果阻塞队列满,会阻塞直到阻塞队列不满

take()方法,出队列,如果阻塞队列空,会阻塞直到阻塞队列不空

class BlockStorage implements Storage{
    private BlockingQueue<Character> blockingQueue = new LinkedBlockingQueue<>(2);

    @Override
    public void produce() throws InterruptedException {
//            生产随机生成的字母
        char temp = (char)(int)(Math.random()*26+97);
//        put方法,阻塞队列满时,会阻塞
        blockingQueue.put(temp);
        System.out.println(Thread.currentThread().getName() + "t 生产:" +temp + "t 现库存:" + blockingQueue.size());
    }

    @Override
    public void consume() throws InterruptedException {
//        take方法,阻塞队列空时,会阻塞
        char temp = blockingQueue.take();
        System.out.println(Thread.currentThread().getName() + "t 消费:" +temp + "t 现库存:" + blockingQueue.size());
    }
}

测试类

3个生产者线程,3个消费者线程

public class ProdConsumerTest {

    public static void main(String[] args) {
        testStorage(new SyncStorage());
//        testStorage(new LockStorage());
//        testStorage(new BlockStorage());
    }

    private static void testStorage(Storage storage) {
        for(int i = 0; i < 3; i++){
            new Thread(()->{
                try {
//                    睡眠2s,模拟生产慢
                    TimeUnit.SECONDS.sleep(2);
                    storage.produce();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "生产者"+i).start();
        }
        for(int i = 0; i < 3; i++){
            new Thread(()->{
                try {
                    storage.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "消费者"+i).start();
        }
    }

}

结果:

result