生产者消费者问题是研究多线程程序时绕不开的经典问题之一, 它描述是有一块缓冲区作为仓库, 生产者可以将产品放入仓库, 消费者则可以从仓库中取走产品
生产者消费者问题
生产者消费者问题是研究多线程程序时绕不开的经典问题之一, 它描述是有一块缓冲区作为仓库, 生产者可以将产品放入仓库, 消费者则可以从仓库中取走产品.
解决生产者 / 消费者问题的方法可分为两类:
- 采用某种机制保护生产者和消费者之间的同步;
- 在生产者和消费者之间建立一个管道.
第一种方式有较高的效率, 并且易于实现, 代码的可控制性较好, 属于常用的模式. 第二种管道缓冲区不易控制, 被传输数据对象不易于封装等, 实用性不强.
同步问题核心在于:
如何保证同一资源被多个线程并发访问时的完整性.
常用的同步方法是采用信号或加锁机制, 保证资源在任意时刻至多被一个线程访问.
Java 语言在多线程编程上实现了完全对象化, 提供了对同步机制的良好支持.
在 Java 中一共有五种方法支持同步, 其中前四个是同步方法, 一个是管道方法.
- wait()/ notify() 方法
- await()/ signal() 方法
- BlockingQueue 阻塞队列方法
- Semaphore 方法
- PipedInputStream / PipedOutputStream
wait()/ notify() 方法
wait()/ nofity() 方法是基类 Object 的两个方法, 也就意味着所有 Java 类都会拥有这两个方法, 这样, 我们就可以为任何对象实现同步机制.
wait() 方法: 当缓冲区已满 / 空时, 生产者 / 消费者线程停止自己的执行, 放弃锁, 使自己处于等等状态, 让其他线程执行.
notify() 方法: 当生产者 / 消费者向缓冲区放入 / 取出一个产品时, 向其他等待的线程发出可执行的通知, 同时放弃锁, 使自己处于等待状态.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| public class Hosee { private static Integer count = 0; private static final Integer FULL = 10; private static final String LOCK = "LOCK";
class Producer implements Runnable { public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count.equals(FULL)) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产, 目前总共有" + count); LOCK.notifyAll(); } } } }
class Consumer implements Runnable { public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费, 目前总共有" + count); LOCK.notifyAll(); } } } }
public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
|
await()/ signal() 方法
wait()和 notify() 必须在 synchronized 的代码块中使用 因为只有在获取当前对象的锁时才能进行这两个操作 否则会报异常
而 await()和 signal() 一般与 Lock() 配合使用.
wait 是 Object 的方法, 而 await 只有部分类有, 如 Condition.
await()/signal() 和新引入的锁定机制 Lock 直接挂钩, 具有更大的灵活性.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| public class Test2 { private static Integer count = 0; private final Integer FULL = 10; final Lock lock = new ReentrantLock(); final Condition NotFull = lock.newCondition(); final Condition NotEmpty = lock.newCondition();
class Producer implements Runnable { public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } lock.lock(); try { while (count == FULL) { try { NotFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产, 目前总共有" + count); NotEmpty.signal(); } finally { lock.unlock(); }
} } }
class Consumer implements Runnable {
public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } lock.lock(); try { while (count == 0) { try { NotEmpty.await(); } catch (Exception e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费, 目前总共有" + count); NotFull.signal(); } finally { lock.unlock(); }
}
}
}
public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
|
BlockingQueue 阻塞队列方法
put() 方法: 类似于我们上面的生产者线程, 容量达到最大时, 自动阻塞.
take() 方法: 类似于我们上面的消费者线程, 容量为 0 时, 自动阻塞.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class Hosee { private static Integer count = 0; final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10); class Producer implements Runnable { public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { bq.put(1); count++; System.out.println(Thread.currentThread().getName() + "生产者生产, 目前总共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } }
class Consumer implements Runnable { public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { bq.take(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费, 目前总共有" + count); } catch (Exception e) { e.printStackTrace(); } } }
}
public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
|
Semaphore 方法
Semaphore 信号量, 就是一个允许实现设置好的令牌. 也许有 1 个, 也许有 10 个或更多.
谁拿到令牌 (acquire) 就可以去执行了, 如果没有令牌则需要等待.
执行完毕, 一定要归还 (release) 令牌, 否则令牌会被很快用光, 别的线程就无法获得令牌而执行下去了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| public class Hosee{ int count = 0; final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1); class Producer implements Runnable{ public void run(){ for (int i = 0; i < 10; i++){ try{ Thread.sleep(3000); } catch (Exception e){ e.printStackTrace(); } try{ notFull.acquire(); mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生产者生产, 目前总共有" + count); } catch (Exception e){ e.printStackTrace(); } finally{ mutex.release(); notEmpty.release(); } } } }
class Consumer implements Runnable{ public void run(){ for (int i = 0; i < 10; i++){ try{ Thread.sleep(3000); } catch (InterruptedException e1){ e1.printStackTrace(); } try{ notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费, 目前总共有" + count); } catch (Exception e){ e.printStackTrace(); } finally{ mutex.release(); notFull.release(); } } } } public static void main(String[] args) throws Exception{ Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
|
这个类位于 java.io 包中, 是解决同步问题的最简单的办法, 一个线程将数据写入管道, 另一个线程从管道读取数据, 这样便构成了一种生产者 /
消费者的缓冲区编程模式. PipedInputStream/PipedOutputStream 只能用于多线程模式, 用于单线程下可能会引发死锁.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class Hosee { final PipedInputStream pis = new PipedInputStream(); final PipedOutputStream pos = new PipedOutputStream(); { try { pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } }
class Producer implements Runnable { @Override public void run() { try{ while(true){ int b = (int) (Math.random() * 255); System.out.println("Producer: a byte, the value is " + b); pos.write(b); pos.flush(); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } } }
class Consumer implements Runnable {
@Override public void run() { try{ while(true){ int b = pis.read(); System.out.println("Consumer: a byte, the value is " + String.valueOf(b)); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } }
}
public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
|