一、synchronized
使用synchronized互斥访问生产方法(produce)和消费方法(consume)
如果仓库满,就调用wait()方法阻塞生产者线程;如果仓库空,就调用wait()方法阻塞消费者线程
调用notifyAll()方法唤醒线程
interface Storage{
public void produce() throws InterruptedException;
public void consume() throws InterruptedException;
}
class SyncStorage implements Storage {
// 仓库容量
private final int MAX_SIZE= 2;
// 仓库存储载体
private Queue<Character> queue = new LinkedList<>();
@Override
public void produce() throws InterruptedException {
synchronized (queue){
while (queue.size() >= MAX_SIZE){
queue.wait();
}
// 生产随机生成的字母
char temp = (char)(int)(Math.random()*26+97);
queue.add(temp);
System.out.println(Thread.currentThread().getName() + "t 生产:" +temp + "t 现库存:" + queue.size());
queue.notifyAll();
}
}
@Override
public void consume() throws InterruptedException {
synchronized (queue){
while (queue.size() == 0){
queue.wait();
}
char temp = queue.remove();
System.out.println(Thread.currentThread().getName() + "t 消费:" +temp + "t 现库存:" + queue.size());
queue.notifyAll();
}
}
}
二、Lock
使用Lock和Condition 实现生产者消费者
class LockStorage implements Storage{
// 仓库容量
private final int MAX_SIZE= 2;
// 仓库存储载体
private Queue<Character> queue = new LinkedList<>();
// 锁
private Lock lock = new ReentrantLock();
// 仓库满的条件
private Condition full = lock.newCondition();
// 仓库空的条件
private Condition empty = lock.newCondition();
@Override
public void produce() throws InterruptedException {
lock.lock();
try{
while (queue.size() >= MAX_SIZE){
full.await();
}
// 生产随机生成的字母
char temp = (char)(int)(Math.random()*26+97);
queue.add(temp);
System.out.println(Thread.currentThread().getName() + "t 生产:" +temp + "t 现库存:" + queue.size());
empty.signalAll();
}finally {
lock.unlock();
}
}
@Override
public void consume() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0){
empty.await();
}
char temp = queue.remove();
System.out.println(Thread.currentThread().getName() + "t 消费:" +temp + "t 现库存:" + queue.size());
full.signalAll();
}finally {
lock.unlock();
}
}
}
三、BlockingQueue
用BlockingQueue实现生产者消费者
put()方法,入队列,如果阻塞队列满,会阻塞直到阻塞队列不满
take()方法,出队列,如果阻塞队列空,会阻塞直到阻塞队列不空
class BlockStorage implements Storage{
private BlockingQueue<Character> blockingQueue = new LinkedBlockingQueue<>(2);
@Override
public void produce() throws InterruptedException {
// 生产随机生成的字母
char temp = (char)(int)(Math.random()*26+97);
// put方法,阻塞队列满时,会阻塞
blockingQueue.put(temp);
System.out.println(Thread.currentThread().getName() + "t 生产:" +temp + "t 现库存:" + blockingQueue.size());
}
@Override
public void consume() throws InterruptedException {
// take方法,阻塞队列空时,会阻塞
char temp = blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "t 消费:" +temp + "t 现库存:" + blockingQueue.size());
}
}
测试类
3个生产者线程,3个消费者线程
public class ProdConsumerTest {
public static void main(String[] args) {
testStorage(new SyncStorage());
// testStorage(new LockStorage());
// testStorage(new BlockStorage());
}
private static void testStorage(Storage storage) {
for(int i = 0; i < 3; i++){
new Thread(()->{
try {
// 睡眠2s,模拟生产慢
TimeUnit.SECONDS.sleep(2);
storage.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者"+i).start();
}
for(int i = 0; i < 3; i++){
new Thread(()->{
try {
storage.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "消费者"+i).start();
}
}
}
结果: