我可以获得一个完整的简单场景,即建议如何使用它的教程,特别是与队列一起使用吗?
7 回答
和方法旨在wait()
提供notify()
一种机制以允许线程阻塞直到满足特定条件。为此,我假设您想要编写一个阻塞队列实现,其中您有一些固定大小的元素后备存储。
您要做的第一件事是确定您希望方法等待的条件。在这种情况下,您将希望该put()
方法阻塞,直到存储中有可用空间,并且您希望该take()
方法阻塞,直到有一些元素要返回。
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
while(queue.size() == capacity) {
wait();
}
queue.add(element);
notify(); // notifyAll() for multiple producer/consumer threads
}
public synchronized T take() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
T item = queue.remove();
notify(); // notifyAll() for multiple producer/consumer threads
return item;
}
}
关于必须使用等待和通知机制的方式,有几点需要注意。
首先,您需要确保对wait()
或的任何调用notify()
都在同步的代码区域内(其中wait()
和notify()
调用在同一个对象上同步)。造成这种情况的原因(除了标准线程安全问题)是由于被称为丢失信号的东西。
这方面的一个例子是,一个线程可能会put()
在队列恰好满时调用,然后它检查条件,看到队列已满,但是在它可以阻塞之前调度另一个线程。然后,第二个线程take()
是队列中的一个元素,并通知等待线程队列不再满。但是,因为第一个线程已经检查了条件,所以它会wait()
在重新调度后简单地调用,即使它可以取得进展。
通过在共享对象上同步,您可以确保不会发生此问题,因为在第take()
一个线程实际阻塞之前,第二个线程的调用将无法进行。
其次,由于被称为虚假唤醒的问题,您需要将正在检查的条件放入 while 循环而不是 if 语句中。这是有时可以重新激活等待线程而不notify()
被调用的地方。将此检查放入while循环将确保如果发生虚假唤醒,将重新检查条件,并wait()
再次调用线程。
正如其他一些答案所提到的,Java 1.5 引入了一个新的并发库(在java.util.concurrent
包中),旨在提供对等待/通知机制的更高级别的抽象。使用这些新功能,您可以像这样重写原始示例:
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public void put(T element) throws InterruptedException {
lock.lock();
try {
while(queue.size() == capacity) {
notFull.await();
}
queue.add(element);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while(queue.isEmpty()) {
notEmpty.await();
}
T item = queue.remove();
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
当然,如果你真的需要一个阻塞队列,那么你应该使用 BlockingQueue接口的实现。
此外,对于此类内容,我强烈推荐Java Concurrency in Practice,因为它涵盖了您可能想了解的有关并发相关问题和解决方案的所有信息。
不是队列示例,但非常简单:)
class MyHouse {
private boolean pizzaArrived = false;
public void eatPizza(){
synchronized(this){
while(!pizzaArrived){
wait();
}
}
System.out.println("yumyum..");
}
public void pizzaGuy(){
synchronized(this){
this.pizzaArrived = true;
notifyAll();
}
}
}
一些重要的点:
1)永远不要这样做
if(!pizzaArrived){
wait();
}
始终使用 while(condition),因为
- a) 线程可以在没有任何人通知的情况下偶尔从等待状态中醒来。(即使披萨人没有按铃,也有人会决定尝试吃披萨。)。
- b) 获取同步锁后应再次检查条件。假设披萨不会永远持续下去。你醒了,排队吃披萨,但这对每个人来说都不够。如果你不检查,你可能会吃纸!:) (可能更好的例子是
while(!pizzaExists){ wait(); }
.
2) 在调用 wait/nofity 之前,您必须持有锁(同步)。线程也必须在唤醒之前获取锁。
3) 尽量避免在同步块中获取任何锁,并努力不调用外来方法(您不确定它们在做什么的方法)。如果必须,请务必采取措施避免死锁。
4) 小心 notify()。坚持使用 notifyAll() 直到你知道你在做什么。
5)最后但同样重要的是,阅读Java 并发实践!
即使你特别要求wait()
,notify()
我觉得这句话仍然很重要:
Josh Bloch,Effective Java 2nd Edition,第 69 条:Prefer concurrency utility to wait
and notify
(强调他的):
鉴于正确使用
wait
and的难度notify
,您应该使用更高级别的并发实用程序,而不是[...] 直接使用wait
andnotify
就像在“并发汇编语言”中编程,与java.util.concurrent
. 很少有理由在新代码中使用wait
和notify
。
你看过这个Java 教程吗?
此外,我建议您不要在真实软件中玩这种东西。玩它很好,所以你知道它是什么,但是并发到处都有陷阱。如果您正在为其他人构建软件,最好使用更高级别的抽象和同步集合或 JMS 队列。
这至少是我所做的。我不是并发专家,所以我尽可能避免手动处理线程。
例子
public class myThread extends Thread{
@override
public void run(){
while(true){
threadCondWait();// Circle waiting...
//bla bla bla bla
}
}
public synchronized void threadCondWait(){
while(myCondition){
wait();//Comminucate with notify()
}
}
}
public class myAnotherThread extends Thread{
@override
public void run(){
//Bla Bla bla
notify();//Trigger wait() Next Step
}
}
该问题要求等待()+通知()涉及队列(缓冲区)。首先想到的是使用缓冲区的生产者-消费者场景。
我们系统中的三个组件:
- Queue [Buffer] - 线程间共享的固定大小队列
- 生产者 - 线程产生/插入值到缓冲区
- 消费者 - 线程从缓冲区消费/删除值
生产者线程:生产者在缓冲区中插入值,直到缓冲区已满。如果缓冲区已满,则生产者调用 wait() 并进入等待阶段,直到消费者唤醒它。
static class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Producer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
if (queue.size() == maxSize) {
try {
System.out.println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println(" ^^^ Producing value : " + i);
queue.add(i);
queue.notify();
}
sleepRandom();
}
}
}
消费者线程:消费者线程从缓冲区中删除值,直到缓冲区为空。如果缓冲区为空,消费者调用 wait() 方法并进入等待状态,直到生产者发送通知信号。
static class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Consumer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
Random random = new Random();
while (true) {
synchronized (queue) {
if (queue.isEmpty()) {
System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue");
try {
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println(" vvv Consuming value : " + queue.remove());
queue.notify();
}
sleepRandom();
}
}
}
实用方法:
public static void sleepRandom(){
Random random = new Random();
try {
Thread.sleep(random.nextInt(250));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
申请代码:
public static void main(String args[]) {
System.out.println("How to use wait and notify method in Java");
System.out.println("Solving Producer Consumper Problem");
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start();
consumer.start();
}
示例输出:
^^^ Producing value : 1268801606
vvv Consuming value : 1268801606
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
^^^ Producing value : -191710046
vvv Consuming value : -191710046
^^^ Producing value : -1096119803
vvv Consuming value : -1096119803
^^^ Producing value : -1502054254
vvv Consuming value : -1502054254
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
^^^ Producing value : 408960851
vvv Consuming value : 408960851
^^^ Producing value : 2140469519
vvv Consuming value : 65361724
^^^ Producing value : 1844915867
^^^ Producing value : 1551384069
^^^ Producing value : -2112162412
vvv Consuming value : -887946831
vvv Consuming value : 1427122528
^^^ Producing value : -181736500
^^^ Producing value : -1603239584
^^^ Producing value : 175404355
vvv Consuming value : 1356483172
^^^ Producing value : -1505603127
vvv Consuming value : 267333829
^^^ Producing value : 1986055041
Queue is full, Producer thread waiting for consumer to take something from queue
vvv Consuming value : -1289385327
^^^ Producing value : 58340504
vvv Consuming value : 1244183136
^^^ Producing value : 1582191907
Queue is full, Producer thread waiting for consumer to take something from queue
vvv Consuming value : 1401174346
^^^ Producing value : 1617821198
vvv Consuming value : -1827889861
vvv Consuming value : 2098088641
线程中的 wait() 和 notifyall() 示例。
同步的静态数组列表用作资源,如果数组列表为空,则调用 wait() 方法。一旦为数组列表添加了一个元素,就会调用 notify() 方法。
public class PrinterResource extends Thread{
//resource
public static List<String> arrayList = new ArrayList<String>();
public void addElement(String a){
//System.out.println("Add element method "+this.getName());
synchronized (arrayList) {
arrayList.add(a);
arrayList.notifyAll();
}
}
public void removeElement(){
//System.out.println("Remove element method "+this.getName());
synchronized (arrayList) {
if(arrayList.size() == 0){
try {
arrayList.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
arrayList.remove(0);
}
}
}
public void run(){
System.out.println("Thread name -- "+this.getName());
if(!this.getName().equalsIgnoreCase("p4")){
this.removeElement();
}
this.addElement("threads");
}
public static void main(String[] args) {
PrinterResource p1 = new PrinterResource();
p1.setName("p1");
p1.start();
PrinterResource p2 = new PrinterResource();
p2.setName("p2");
p2.start();
PrinterResource p3 = new PrinterResource();
p3.setName("p3");
p3.start();
PrinterResource p4 = new PrinterResource();
p4.setName("p4");
p4.start();
try{
p1.join();
p2.join();
p3.join();
p4.join();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("Final size of arraylist "+arrayList.size());
}
}