0

根据http://en.wikipedia.org/wiki/Producer-consumer_problem我想使用信号量模拟 P/C 问题。我陷入僵局,我不知道有什么问题。

public static void main(String[] args) {
        CustomBlockingQueue blockingQueue = new CustomBlockingQueue();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}

@SuppressWarnings("serial")
class CustomBlockingQueue extends LinkedList<Object> {
    private static final int MAX_SIZE = 10;

    private Semaphore mutex = new Semaphore(1);
    private Semaphore fillCount = new Semaphore(0);
    private Semaphore emptyCount = new Semaphore(MAX_SIZE);

    @Override
    public boolean offer(Object e) {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        boolean result = super.offer(e);
        System.out.println("offer " + size());
        try {
            fillCount.release();
            emptyCount.acquire();
            mutex.release();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        return result;
    }

    @Override
    public Object poll() {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        Object result = super.poll();
        System.out.println("poll  " + size());
        try {
            emptyCount.release();
            fillCount.acquire();
            mutex.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

class Producer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Producer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(2));
                blockingQueue.offer(new Object());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Consumer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(4));
                blockingQueue.poll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

使用信号量

信号量解决了丢失唤醒呼叫的问题。在下面的解决方案中,我们使用了两个信号量,fillCount 和 emptyCount,来解决这个问题。fillCount 是缓冲区中要读取的项目数,emptyCount 是缓冲区中可以写入项目的可用空间数。当新项目被放入缓冲区时,fillCount 会增加,而 emptyCount 会减少。如果生产者在其值为 0 时尝试减少 emptyCount,则生产者将进入睡眠状态。下次消费一个项目时,emptyCount 会增加,生产者会醒来。消费者的工作方式类似。

4

2 回答 2

2

您可能会考虑使用 a 来代替BlockingQueue互斥锁并等待您的情况。

另一方面,我有一个旧页面,它演示了生产者/消费者的竞争条件(与虚假中断相反)。但是我的实现不使用信号量,所以我不确定它会帮助你:

http://256stuff.com/gray/docs/misc/producer_consumer_race_conditions/

于 2012-04-04T22:16:37.553 回答
2

您的锁定顺序错误:

需要提供:

        emptyCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        fillCount.release();

民意调查需要类似的更改:

        fillCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        emptyCount.release();

在您的实现中,您在等待信号量的同时持有互斥量,这会导致问题,因为其他线程可以等待互斥量以释放信号量。

于 2012-04-04T22:18:01.427 回答