5

我使用可重入锁和条件编写了一个生产者消费者程序。它工作正常,但我不太确定实施是否正确。此外,它似乎不是最优的。有人可以验证这是否是一个正确的实现,而且你能告诉我如何优化它,比如 - 在真正需要的地方锁定

public class TestRL {

static class Observed {
    boolean filled = false;

    public void setFilled(boolean filled) {
        this.filled = filled;
    }

    public boolean getFilled() {
        return filled;
    }
}

static Observed observed = new Observed();

static class Consumer implements Runnable {
    Observed observed;
    ReentrantLock lock;
    Condition condition;

    Consumer(Observed observed, ReentrantLock lock, Condition condition) {
        this.observed = observed;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        lock.lock();
        try {
            for (int i = 0; i < 20; i++) {
                if (observed.getFilled() == false) {
                    try {
                        System.out.println("consumer waiting");
                        condition.await();
                        System.out.println("consumed");

                        Thread.sleep(400 + 1000 * i % 2);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    observed.setFilled(false);
                    condition.signalAll();
                }else{
                    observed.setFilled(false);
                    condition.signalAll();
                }
            }
        } finally {
            lock.unlock();
        }

    }
}

static class Producer implements Runnable {
    Observed observed;
    ReentrantLock lock;
    Condition condition;

    Producer(Observed observed, ReentrantLock lock, Condition condition) {
        this.observed = observed;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        lock.lock();
        try {
            for (int i = 0; i < 20; i++) {
                if (observed.getFilled() == true) {
                    try {
                        System.out.println("producer waiting");
                        condition.await();
                        System.out.println("produced");
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    observed.setFilled(true);
                    condition.signalAll();
                }else{
                    observed.setFilled(true);
                    condition.signalAll();
                }
            }
        } finally {
            lock.unlock();
        }

    }
}

/**
 * @param args
 */
public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    Producer producer = new Producer(observed, lock, condition);
    Consumer consumer = new Consumer(observed, lock, condition);
    Thread t1 = new Thread(producer);
    Thread t2 = new Thread(consumer);
    t1.start();
    t2.start();

}

}

4

4 回答 4

3

以下是使用 ReentrantLock & Condition 的生产者-消费者问题的另一个示例代码。以防万一有人想要。

package reentrant_prodcons;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;


public class Reentrant_ProdCons {

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        // TODO code application logic here

        Queue<Integer> queue=new LinkedList<Integer>();
        ReentrantLock lock=new ReentrantLock();
        Condition con=lock.newCondition();
        final int size = 5;

        new Producer(lock, con, queue, size).start();
        new Consumer(lock, con, queue).start();

    }

}


class Producer extends Thread{

    ReentrantLock  lock;
    Condition con;
    Queue<Integer> queue;
    int size;

    public Producer(ReentrantLock lock, Condition con, Queue<Integer> queue, int size) {
        this.lock = lock;
        this.con = con;
        this.queue = queue;
        this.size=size;
    }


    public void run(){
        for(int i=0;i<10;i++){
            lock.lock();
            while(queue.size()==size){
                try {
                    con.await();
                } catch (InterruptedException ex) {
                    Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            queue.add(i);
            System.out.println("Produced : "+i);
            con.signal();
            lock.unlock();
        }
    }

}

class Consumer extends Thread{


    ReentrantLock lock;
    Condition con;
    Queue<Integer> queue;


    public Consumer(ReentrantLock lock, Condition con, Queue<Integer> queue) {
        this.lock = lock;
        this.con = con;
        this.queue = queue;
    }

    public void run(){
        for(int i=0;i<10;i++){
           lock.lock();
           while(queue.size()<1){
               try {
                   con.await();
               } catch (InterruptedException ex) {
                   Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
               }
           }
            System.out.println("Consumed : "+queue.remove());
            con.signal();
            lock.unlock();
        }
    }
}
于 2015-06-09T19:08:31.057 回答
2

希望能帮助到你。您可以通过更改消费者/生产者等待时间轻松检查代码中的上溢/下溢条件。

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*
   Re-entrant Locks can be acquired again by the same thread and the thread 
   keeps a count of that.
   Then that thread has to unlock it same number of times.

   normally usually it is only once.

   unlock should be called in finally and code should be in try block.

   This also has a wait in form of Condition.await(). (which obviouly should be 
   called inside a locked/synchronized block)

   This is basically helpful to avoid deadlocks which we will see in next 
   program

 */
public class ReentrantLock1 {

private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();

private static Queue<Integer> queue = new LinkedList<>() ;
private static int maxQueueSize = 10;

private static int consumer_wait_time = 00;
private static int producer_wait_time = 300;

public static void main(String[] args) {
    Thread t1 = new Thread(() -> producer());
    Thread t2 = new Thread(() -> consumer());

    t1.start();
    t2.start();

    try {
        t1.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

private static void producer() {
    int i = 1 ;
    while(true) {
        try {
            Thread.sleep(producer_wait_time);
            lock.lock();
            if(queue.size() == maxQueueSize)
                condition.await();
            queue.add(i);
            i++;
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
    }

}

private static void consumer() {
    while(true) {
        try {
            Thread.sleep(consumer_wait_time);
            lock.lock();
            System.out.print("queue size is: "+ queue.size());
            if(queue.size()==0)
                condition.await();
            int i = queue.peek();
            queue.remove();
            System.out.println("Element on top is: "+i);
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //unlock should be called in finally and code should be in try block.
            lock.unlock();
        }
    }
 }
}
于 2018-07-30T16:37:34.457 回答
0
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class prodConsumeJava8 
{

    public static void main(String[] args)
    {
        BlockingQueue<Integer> que=new LinkedBlockingQueue<>();

        final Runnable singleproducer = () ->{
            try
            {
                for (int i = 0; i < 5; i++) 
                {
                    System.out.println("Produced: " + i);
                    que.put(i);
                }

            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        };

        final Runnable singleConsumer = () ->{

            while(true)
            {
                try
                {
                    System.out.println("Consumer"+que.take());
                }
                catch (Exception e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        };

        new Thread(singleproducer).start();
        new Thread(singleConsumer).start();

    }

}
于 2019-08-16T07:03:39.250 回答
0
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    /*
     * To change this license header, choose License Headers in Project Properties.
     * To change this template file, choose Tools | Templates
     * and open the template in the editor.
     */
    /**
     *
     * @author sakshi
     */
    public class ReentrantLockDemo {

        static List<Integer> list = new ArrayList<Integer>();
        static ReentrantLock lock = new ReentrantLock();
        static Condition ProdCons = lock.newCondition();

      static class Producer extends Thread {

            List<Integer> list;
            Producer(List<Integer> list) {
                this.list = list;

            }

            public void run() {

                for (int i = 0; i < 10; i++) {
                    lock.lock();

                    if (list.size() >=1) {
                        try {
                            ProdCons.await();
                        } catch (InterruptedException ex) {
                            Logger.getLogger(ReentrantLockDemo.class.getName()).log(Level.SEVERE, null, ex);
                        }
                    }

                    list.add(i);
                    System.out.println("produce="+i);
                     ProdCons.signalAll();

                      lock.unlock(); 

                }

            }

        }

        static class Consumer extends Thread {

            List<Integer> list;


            Consumer(List<Integer> list) {
                this.list = list;

            }

            @Override
            public void run() {

                for (int i = 0; i < 10; i++) {
                      lock.lock();
                    while (list.isEmpty()) {
                        try {
                            ProdCons.await();
                        } catch (InterruptedException ex) {
                            Logger.getLogger(ReentrantLockDemo.class.getName()).log(Level.SEVERE, null, ex);
                        }
                    }

                        System.out.println("consume=" +list.remove(0));
                        ProdCons.signalAll();

                        lock.unlock();
                    }

            }
        }


        public static void main(String[] args) {
            Producer produce = new Producer(list);
            Consumer consume = new Consumer(list);
            produce.start();
            consume.start();
        }
    }


output:
produce=0
consume=0
produce=1
consume=1
produce=2
consume=2
produce=3
consume=3
produce=4
consume=4
produce=5
consume=5
produce=6
consume=6
produce=7
consume=7
produce=8
consume=8
produce=9
consume=9
于 2017-02-26T13:01:27.657 回答