1

我需要编写一个类似于生产者-消费者的问题,必须使用信号量。我尝试了几种解决方案,但都没有奏效。首先,我在 Wikipedia 上尝试了一个解决方案,但没有奏效。我当前的代码是这样的:

消费者的方法运行:

    public void run() {
    int i=0;
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    String s = new String();
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        try {
            Thread.sleep(1000);///10000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.encheBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i] == null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            QuantidadeBuffer.quantidade--;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            int identificador;
            identificador=buffer[i].getIdentificador()[0];
            s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i;
            //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            buffer[i]= null;
        }
        // RC
        this.mutex.up();
        //this.esvaziaBuffer.up();
        System.out.println(s);
  //            lock.up();
    }
}

生产者的方法运行:

    public void run() {
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    int i=0;
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        // Produz Item
        try {
            Thread.sleep(500);//50000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.esvaziaBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i]!=null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            int identificador[]=new int[Pedido.getTamanho_identificador()];
            identificador[0]=i;
            buffer[i]=new Pedido();
            Produtor.buffer[i].setIdentificador(identificador);
            Produtor.buffer[i].setTexto("pacote de dados");
            QuantidadeBuffer.quantidade++;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            i++;
        }
        // RC
        this.mutex.up();
        //this.encheBuffer.up();
    }
    //this.encheBuffer.up();
}

在上面的代码中,消费者线程读取了一个位置,然后另一个线程读取了相同的位置,而没有生产者填充该位置,如下所示:

Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
4

4 回答 4

9

看来您使用的是互斥锁而不是信号量?

在使用互斥锁时,您只有二进制同步 - 锁定和解锁一个资源。Sempahores 具有您可以发出信号或获取的值。

您正在尝试锁定/解锁整个缓冲区,但这是错误的方法,因为如您所见,生产者或消费者锁定,并且当阅读器锁定它时,生产者无法填充缓冲区(因为它必须先锁定)。

相反,您应该创建一个信号量,然后当生产者写入一个数据包或数据块时,它可以向信号量发出信号。然后,消费者可以尝试获取信号量,因此他们将等待生产者发出已写入数据包的信号。在发出写入数据包的信号后,其中一个消费者将被唤醒,它会知道它可以读取一个数据包。它可以读取一个数据包,然后返回尝试在信号量上获取。如果在那段时间生产者写入了另一个数据包,它会再次发出信号,然后任何一个消费者将继续读取另一个数据包。ETC...

例如:

(Producer) - 写一个数据包 - Semaphore.release(1)

(Consumer xN) - Semaphore.acquire(1) - 读取一个数据包

如果您有多个消费者,那么消费者(而不是生产者)应该在读取数据包时(而不是在获取信号量时)锁定缓冲区,以防止出现竞争条件。在下面的示例中,生产者还锁定了列表,因为所有内容都在同一个 JVM 上。

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Semaphores {

    static Object LOCK = new Object();

    static LinkedList list = new LinkedList();
    static Semaphore sem = new Semaphore(0);
    static Semaphore mutex = new Semaphore(1);

    static class Consumer extends Thread {
        String name;
        public Consumer(String name) {
            this.name = name;
        }
        public void run() {
            try {

                while (true) {
                    sem.acquire(1);
                    mutex.acquire();
                    System.out.println("Consumer \""+name+"\" read: "+list.removeFirst());
                    mutex.release();
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    static class Producer extends Thread {
        public void run() {
            try {

                int N = 0;

                while (true) {
                    mutex.acquire();
                    list.add(new Integer(N++));
                    mutex.release();
                    sem.release(1);
                    Thread.sleep(500);
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        new Producer().start();
        new Consumer("Alice").start();
        new Consumer("Bob").start();
    }
}
于 2011-11-27T22:26:09.963 回答
1

多线程应用程序最常见的使用模式之一是创建异步通信网络。几个现实世界的应用程序需要这个。有两种方法可以实现这一点:-

  1. 生产者和消费者是紧密耦合的。这不是异步的,每个生产者都在等待消费者,反之亦然。应用程序的吞吐量也成为 2 个实体中的最小值。这通常不是一个好的设计。
  2. 更好(也更复杂)的方法是在生产者和消费者之间引入共享缓冲区。这样,更快的生产者或更快的消费者不会因为较慢的对应物而受到限制。它还允许多个生产者和多个消费者通过共享缓冲区进行连接。

在此处输入图像描述

有几种方法可以创建生产者-消费者模式。

  1. 使用前面“锁定基础”模块中介绍的 wait/notify/nofityAll
  2. 使用 Java 提供的 API - java.util.concurrent.BlockingQueue。我们将在后续模块中对此进行更多介绍。
  3. 使用信号量:这是创建生产者-消费者模式的一种非常方便的方法。

    public class ProducerConsumerSemaphore {
    
    private static final int BUFFER_SIZE = 10;
    private static final int MAX_VALUE = 10000;
    private final Stack<Integer> buffer = new Stack<Integer>();
    private final Semaphore writePermits = new Semaphore(BUFFER_SIZE);
    private final Semaphore readPermits = new Semaphore(0);
    private final Random random = new Random();
    
    class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                writePermits.acquireUninterruptibly();
                buffer.push(random.nextInt(MAX_VALUE));
                readPermits.release();
            }
        }
    }
    
    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                readPermits.acquireUninterruptibly();
                System.out.println(buffer.pop());
                writePermits.release();
            }
        }
    }
    
    public static void main(String[] args) {
    
        ProducerConsumerSemaphore obj = new ProducerConsumerSemaphore();
        Producer p1 = obj.new Producer();
        Producer p2 = obj.new Producer();
        Producer p3 = obj.new Producer();
        Consumer c1 = obj.new Consumer();
        Consumer c2 = obj.new Consumer();
        Consumer c3 = obj.new Consumer();
        Thread t1 = new Thread(p1);
        Thread t2 = new Thread(p2);
        Thread t3 = new Thread(p3);
        Thread t4 = new Thread(c1);
        Thread t5 = new Thread(c2);
        Thread t6 = new Thread(c3);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        t6.start();
    }
    

我们使用 2 个信号量 - 1 个用于消费者,1 个用于生产者。

生产者允许的许可数量设置为最大缓冲区大小。

每个生产者消耗 1 个写许可,并在产生 1 条消息时释放 1 个读许可。

每个消费者消费 1 个读许可并释放 1 个写许可用于消费每条消息。

想象一下在实际消息上存钱的许可证。写入许可流从生产者流向消费者(然后返回生产者)。读取许可从消费者流向生产者(并返回消费者)。在任何给定时间点缓冲区中的总消息将完全等于发出的读取许可数。如果产生消息的速率大于消费消息的速率,那么在某个时刻,可用的写许可数量将被耗尽,所有生产者线程将被阻塞,直到消费者从缓冲区读取并释放写许可。同样的逻辑反过来也存在。

在此处输入图像描述

以上是系统中消息流和许可的更直观的表述。通过使用信号量,我们只是抽象出使用 wait/notify/notifyAll 编写一段代码所需的血腥细节和注意事项。上面的代码可以和wait等比。方法:

当一个线程因缺少许可而被阻塞时,它相当于对该信号量的 wait() 调用。

当线程释放许可时,它相当于对该特定信号量的 notifyAll() 调用。

于 2019-04-11T14:17:16.980 回答
0
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
/**
 *
 * @author sakshi
 */
public class SemaphoreDemo {

    static Semaphore producer = new Semaphore(1);
    static Semaphore consumer = new Semaphore(0);
    static List<Integer> list = new ArrayList<Integer>();

    static class Producer extends Thread {

        List<Integer> list;

        public Producer(List<Integer> list) {
            this.list = list;
        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    producer.acquire();

                } catch (InterruptedException ex) {
                    Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                }
                System.out.println("produce=" + i);

                list.add(i);
                consumer.release();

            }
        }
    }

    static class Consumer extends Thread {

        List<Integer> list;

        public Consumer(List<Integer> list) {
            this.list = list;
        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    consumer.acquire();
                } catch (InterruptedException ex) {
                    Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                }

                System.out.println("consume=" + list.get(i));

                producer.release();
            }
        }
    }

    public static void main(String[] args) {
        Producer produce = new Producer(list);

        Consumer consume = new Consumer(list);

        produce.start();
        consume.start();
    }
}


output:

produce=0
consume=0
produce=1
consume=1
produce=2
consume=2
produce=3
consume=3
produce=4
consume=4
produce=5
consume=5
produce=6
consume=6
produce=7
consume=7
produce=8
consume=8
produce=9
consume=9
于 2017-02-25T15:47:57.457 回答
0
import java.util.concurrent.Semaphore;


public class ConsumerProducer{

    public static void main(String[] args) {

           Semaphore semaphoreProducer=new Semaphore(1);
           Semaphore semaphoreConsumer=new Semaphore(0);
           System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0");

           new Producer(semaphoreProducer,semaphoreConsumer).start();
           new Consumer(semaphoreConsumer,semaphoreProducer).start();

    }


/**
 * Producer Class.
 */
static class Producer extends Thread{

    Semaphore semaphoreProducer;
    Semaphore semaphoreConsumer;


    public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) {
           this.semaphoreProducer=semaphoreProducer;
           this.semaphoreConsumer=semaphoreConsumer;
    }

    public void run() {
           for(;;){
                  try {
                      semaphoreProducer.acquire();
                      System.out.println("Produced : "+Thread.currentThread().getName());
                      semaphoreConsumer.release();

                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }          
    }
}

/**
 * Consumer Class.
 */
static class Consumer extends Thread{

    Semaphore semaphoreConsumer;
    Semaphore semaphoreProducer;

    public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) {
           this.semaphoreConsumer=semaphoreConsumer;
           this.semaphoreProducer=semaphoreProducer;
    }

    public void run() {

           for(;;){
                  try {
                      semaphoreConsumer.acquire();
                      System.out.println("Consumed : "+Thread.currentThread().getName());
                      semaphoreProducer.release();
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }
    }

}
}
于 2017-04-17T18:38:35.787 回答