- 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
- 如果共享数据区为空的话,阻塞消费者继续消费数据;
Java 中,可以通过配合调用 Object 对象的 wait() 方法和 notify()方法或 notifyAll() 方法来实现线程间的通信。在线程中调用 wait() 方法,将阻塞当前线程,直至等到其他线程调用了 notify() 方法或 notifyAll() 方法进行通知之后,当前线程才能从wait()方法出返回,继续执行下面的操作。
该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()之前,线程必须要获得该对象的对象监视器锁,即只能在同步方法或同步块中调用 wait()方法。调用wait()方法之后,当前线程会释放锁。如果调用wait()方法时,线程并未获取到锁的话,则会抛出IllegalMonitorStateException异常,这是以个RuntimeException。如果再次获取到锁的话,当前线程才能从wait()方法处成功返回。
该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用 notify()时没有持有适当的锁,也会抛出 IllegalMonitorStateException。
该方法任意从WAITTING状态的线程中挑选一个进行通知,使得调用wait()方法的线程从等待队列移入到同步队列中,等待有机会再一次获取到锁,从而使得调用wait()方法的线程能够从wait()方法处退出。调用notify后,当前线程不会马上释放该对象锁,要等到程序退出同步块后,当前线程才会释放锁。 -
该方法与 notify ()方法的工作方式相同,重要的一点差异是:
notifyAll 使所有原来在该对象上 wait 的线程统统退出WAITTING状态,使得他们全部从等待队列中移入到同步队列中去,等待下一次能够有机会获取到对象监视器锁。
notify 通知的遗漏很容易理解,即 threadA 还没开始 wait 的时候,threadB 已经 notify 了,这样,threadB 通知是没有任何响应的,当 threadB 退出 synchronized 代码块后,threadA 再开始 wait,便会一直阻塞等待,直到被别的线程打断。比如在下面的示例代码中,就模拟出notify早期通知带来的问题:
public class EarlyNotifyDemo1 { private static String lockObject = ""; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " 结束wait"); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始notify"); lock.notify(); System.out.println(Thread.currentThread().getName() + " 结束开始notify"); } } }}
Thread-1 进去代码块Thread-1 开始notify
Thread-1 结束开始notify
Thread-0 进去代码块
Thread-0 开始wait
public class EarlyNotifyDemo2 { private static String lockObject = ""; private static boolean isWait = true; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { while (isWait) { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " 结束wait"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始notify"); lock.notifyAll(); isWait = false; System.out.println(Thread.currentThread().getName() + " 结束开始notify"); } } }}
总结:在使用线程的等待/通知机制时,一般都要配合一个 boolean 变量值(或者其他能够判断真假的条件),在 notify 之前改变该 boolean 变量的值,让 wait 返回后能够退出 while 循环(一般都要在 wait 方法外围加一层 while 循环,以防止早期通知),或在通知被遗漏后,不会被阻塞在 wait 方法处。这样便保证了程序的正确性
public class ConditionChangeDemo1 { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //这里使用if的话,就会存在wait条件变化造成程序错误的问题 if (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list为空"); System.out.println(Thread.currentThread().getName() + " 调用wait方法"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait方法结束"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 开始添加元素"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } }}
Thread-0 list为空Thread-0 调用wait方法
Thread-2 开始添加元素
Thread-1 取出第一个元素为:Thread-2Thread-0 wait方法结束
Exception in thread "Thread-0" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
public class ConditionChangeDemo2 { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //这里使用if的话,就会存在wait条件变化造成程序错误的问题 while (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list为空"); System.out.println(Thread.currentThread().getName() + " 调用wait方法"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait方法结束"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 开始添加元素"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } }}
Thread-0 list为空Thread-0 调用wait方法
Thread-2 开始添加元素
Thread-1 取出第一个元素为:Thread-2Thread-0 wait方法结束
Thread-0 list为空
Thread-0 调用wait方法
上面的代码与之前的代码仅仅只是将 wait 外围的 if 语句改为 while 循环即可,这样当 list 为空时,线程便会继续等待,而不会继续去执行删除 list 中元素的代码。
总结:在使用线程的等待/通知机制时,一般都要在 while 循环中调用 wait()方法,因此配合使用一个 boolean 变量(或其他能判断真假的条件,如本文中的 list.isEmpty()),满足 while 循环的条件时,进入 while 循环,执行 wait()方法,不满足 while 循环的条件时,跳出循环,执行后面的代码。
- 永远在while循环中对条件进行判断而不是if语句中进行wait条件的判断;
- 使用NotifyAll而不是使用notify。
// The standard idiom for calling the wait method in Java synchronized (sharedObject) { while (condition) { sharedObject.wait(); // (Releases lock, and reacquires on wakeup) } // do action based upon condition e.g. take or put into queue }
public class ProductorConsumerDemo1 { public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; public Productor(List list, int maxLength) { this.list = list; this.maxLength = maxLength; } @Override public void run() { while (true) { synchronized (list) { try { while (list.size() == maxLength) { System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait"); list.wait(); System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i); list.add(i); list.notifyAll(); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } } static class Consumer implements Runnable { private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { synchronized (list) { try { while (list.isEmpty()) { System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait"); list.wait(); System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait"); } Integer element = list.remove(0); System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element); list.notifyAll(); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } }}
生产者pool-1-thread-2 生产数据-703210513生产者pool-1-thread-2 生产数据-1025434820生产者pool-1-thread-2 生产数据70070412生产者pool-1-thread-2 生产数据-598504371生产者pool-1-thread-2 生产数据-716978999生产者pool-1-thread-2 生产数据-1175198461生产者pool-1-thread-2 生产数据-1212912406生产者pool-1-thread-2 生产数据-332467186生产者pool-1-thread-2 list以达到最大容量,进行wait
消费者pool-1-thread-15 消费数据:-703210513消费者pool-1-thread-15 消费数据:-1025434820消费者pool-1-thread-15 消费数据:70070412消费者pool-1-thread-15 消费数据:-598504371消费者pool-1-thread-15 消费数据:-716978999消费者pool-1-thread-15 消费数据:-1175198461消费者pool-1-thread-15 消费数据:-1212912406消费者pool-1-thread-15 消费数据:-332467186消费者pool-1-thread-15 list为空,进行wait
消费者pool-1-thread-14 list为空,进行wait
消费者pool-1-thread-13 list为空,进行wait
消费者pool-1-thread-11 list为空,进行wait
消费者pool-1-thread-12 list为空,进行wait
消费者pool-1-thread-10 list为空,进行wait
消费者pool-1-thread-9 list为空,进行wait
消费者pool-1-thread-8 list为空,进行wait
消费者pool-1-thread-7 list为空,进行wait
消费者pool-1-thread-6 list为空,进行wait
生产者pool-1-thread-5 生产数据84590545生产者pool-1-thread-5 生产数据-1631754695
void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时;
boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位
boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间
void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
void signalAll():与signal的区别在于能够唤醒所有等待在condition上的线程
public class ProductorConsumerDemo2 { private static ReentrantLock lock = new ReentrantLock(); private static Condition full = lock.newCondition(); private static Condition empty = lock.newCondition(); public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8, lock)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList, lock)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; private Lock lock; public Productor(List list, int maxLength, Lock lock) { this.list = list; this.maxLength = maxLength; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.size() == maxLength) { System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait"); full.await(); System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i); list.add(i); empty.signalAll(); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } static class Consumer implements Runnable { private List<Integer> list; private Lock lock; public Consumer(List list, Lock lock) { this.list = list; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.isEmpty()) { System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait"); empty.await(); System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait"); } Integer element = list.remove(0); System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element); full.signalAll(); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }}
生产者pool-1-thread-1 生产数据-1633842993生产者pool-1-thread-1 生产数据1337251950生产者pool-1-thread-1 生产数据1310879631生产者pool-1-thread-1 生产数据-214297115生产者pool-1-thread-1 生产数据738937512生产者pool-1-thread-1 生产数据13060041生产者pool-1-thread-1 生产数据-957049554生产者pool-1-thread-1 生产数据-1062017880生产者pool-1-thread-1 list以达到最大容量,进行wait
生产者pool-1-thread-2 list以达到最大容量,进行wait
生产者pool-1-thread-3 list以达到最大容量,进行wait
生产者pool-1-thread-4 list以达到最大容量,进行wait
生产者pool-1-thread-5 list以达到最大容量,进行wait
消费者pool-1-thread-6 消费数据:-1633842993消费者pool-1-thread-6 消费数据:1337251950消费者pool-1-thread-6 消费数据:1310879631消费者pool-1-thread-6 消费数据:-214297115消费者pool-1-thread-6 消费数据:738937512消费者pool-1-thread-6 消费数据:13060041消费者pool-1-thread-6 消费数据:-957049554消费者pool-1-thread-6 消费数据:-1062017880消费者pool-1-thread-6 list为空,进行wait
消费者pool-1-thread-7 list为空,进行wait
消费者pool-1-thread-8 list为空,进行wait
消费者pool-1-thread-9 list为空,进行wait
消费者pool-1-thread-10 list为空,进行wait
消费者pool-1-thread-11 list为空,进行wait
消费者pool-1-thread-12 list为空,进行wait
消费者pool-1-thread-13 list为空,进行wait
消费者pool-1-thread-14 list为空,进行wait
消费者pool-1-thread-15 list为空,进行wait
生产者pool-1-thread-1 退出wait
生产者pool-1-thread-1 生产数据1949864858生产者pool-1-thread-1 生产数据-1693880970
public class ProductorConsumerDmoe3 { private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(queue)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(queue)); } } static class Productor implements Runnable { private BlockingQueue queue; public Productor(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Random random = new Random(); int i = random.nextInt(); System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i); queue.put(i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } static class Consumer implements Runnable { private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Integer element = (Integer) queue.take(); System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } }}