1. 生产者-消费者者模式
生产者-消费者模式是并发场景下非常经典的设计模式。生产者用于生产物品,消费者消费生产者生产的物品。在Java中可以多种实现方式:
- synchronized锁
- ReentrantLock锁
- BlockingQueue阻塞队列
- PipedInputStream/PipedOutputStream管道
- Semaphore信号量
2. synchronized实现生产者-消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class One {
private int productNum; private final int capacity = 10; private int ct;
public synchronized void produce(){ while(productNum == capacity){ notifyAll(); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
productNum++; ct++; System.out.println(ct+"生产一个商品:" + productNum + " " + Thread.currentThread().getName() + "获得锁"); notifyAll(); }
public synchronized void consume(){ while(productNum == 0) { notifyAll(); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
productNum--; ct++; System.out.println(ct + "消费一个商品:" + productNum + " " + Thread.currentThread().getName() + "获得锁"); notifyAll(); }
public static void main(String[] args) {
One one = new One();
for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { one.produce(); } }, "P"+i).start(); }
for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { one.consume(); } }, "C"+i).start(); }
} }
|
实现的刚开始唤醒线程用的是notify方法,会出现线程一直阻塞无法推进的问题。原因是notify是随机唤醒一个阻塞的线程,如果在执行的某个时刻,productNum = 0而且生产线程全部是Runnable状态,此时会有一个消费线程处于Blocked状态。结果显然易见,会出现死循环,两个消费线程相互唤醒阻塞。
synchronized锁编写起来非常简单,但是消费线程和生产线程会处于一个等待队列,他们的等待条件相同,而ReentrantLock可以指定等待条件。
2. ReentrantLock实现生产者-消费者模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class Two {
private final Lock lock = new ReentrantLock(); private int productNum = 0; private final int capacity = 10; private int ct; private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition();
public void produce(){ try{ lock.lock(); while(productNum == capacity){ notEmpty.signal(); notFull.await(); }
productNum++; ct++; System.out.println(ct+"生产一个商品:" + productNum + " " + Thread.currentThread().getName() + "获得锁"); notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }
}
public void consume(){ try{ lock.lock();
while(productNum == 0){ notFull.signal(); notEmpty.await(); }
productNum--; ct++; System.out.println(ct+"消费一个商品:" + productNum + " " + Thread.currentThread().getName() + "获得锁"); notFull.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public static void main(String[] args) { Two two = new Two();
for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { two.produce(); } }, "P"+i).start(); }
for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { two.consume(); } }, "C"+i).start(); } } }
|
3. BlockingQueue实现生产者-消费者模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class Three {
private final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
public void produce(){ try { blockingQueue.put(1);; System.out.println(Thread.currentThread().getName() + " 生产一个物品:"); } catch (InterruptedException e) { e.printStackTrace(); } }
public void consume(){ try{ blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " 消费一个物品"); } catch (InterruptedException e) { e.printStackTrace(); } }
public static void main(String[] args) { Three three = new Three();
for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { three.produce(); } }, "P"+i).start(); }
for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { three.consume(); } }, "C"+i).start(); } } }
|
阻塞队列:当队列为满时,阻塞当前生产者线程,唤醒消费者线程;当队列为空的时候,阻塞当前消费者线程,唤醒生产者线程。
使用方法:先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯,但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的,多个生成者和多个消费者者之间则不行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| public class Four {
private final PipedInputStream input = new PipedInputStream(); private final PipedOutputStream output = new PipedOutputStream();
public Four(){ try { input.connect(output); } catch (IOException e) { e.printStackTrace(); } }
public void produce(){ try { int t = (int)(Math.random() * 255); output.write(t); output.flush(); System.out.println("生产一个商品" + t); } catch (IOException e) { e.printStackTrace(); } }
public void consume(){ try { int t = input.read(); System.out.println("消费一个商品" + t); } catch (IOException e) { e.printStackTrace(); } }
public static void main(String[] args) {
Four four = new Four();
new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 100; i++){ four.produce(); } } }).start();
new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 100; i++){ four.consume(); } } }).start();
try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { four.input.close(); four.output.close(); } catch (IOException e) { e.printStackTrace(); } } }
|
5. Semaphore实现生产者-消费者模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class Five {
private int productNum = 0;
private final Semaphore notFull = new Semaphore(10); private final Semaphore notEmpty = new Semaphore(10); private final Semaphore mutex = new Semaphore(1);
public void produce(){ try { notFull.acquire(); mutex.acquire(); productNum++; System.out.println("生产一个商品");
} catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); notEmpty.release(); } }
public void consume(){ try{ notEmpty.acquire(); mutex.acquire(); productNum--; System.out.println("消费一个商品"); } catch (InterruptedException e) { e.printStackTrace(); }finally { mutex.release(); notFull.release(); } }
public static void main(String[] args) {
Five five = new Five(); for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { five.produce(); } }, "P"+i).start(); }
for(int i = 0; i < 100; i++){ new Thread(new Runnable() { @Override public void run() { five.consume(); } }, "C"+i).start(); } } }
|