3

Suppose I have two main running threads A and B, and one thread that is called asynchronously T. I want thread A to wait until a message aMsg is received on thread T and thread B to stop until message msgB is received on thread T. I know how to do this with 2 sempahores:

sempahoreA = new Sempahore(0);
sempahoreB = new Sempahore(0);

//in thread A
//code until where to run 
semaphoreA.acquire()

//in thread B
//code until where to run 
semaphoreB.acquire()

//in thread T
if (msgA.equals(msgRecevied)) {
    semaphoreA.release()
} 
if (msgB.equals(msgReceived)) {
    semaphoreB.release()
}

The trouble is that I have multiple A,B,C,... threads and I do not want to use multiple semaphores. Is there a class somewhere in java.util.concurrent that can replace all the semaphores with only one instance?

synchronizer = //?

//in thread A
//code until where to run 
synchronizer.acquire(msgA)//only let go if msgA is received from thread calling release

//in thread B
//code until where to run 
synchronizer.acquire(msgB)//only let go if msgA is received from thread calling release

//in thread T
if (msgA.equals(msgRecevied)) {
    synchronizer.release(msgA)
} 
if (msgB.equals(msgReceived)) {
    synchronizer.release(msgB)
}//actually here you can call synchronizer.release(msgReceived)
4

1 回答 1

2

Great intuition m3th0dman. I think what you're looking for is called a Transfer Queue.

This is "a BlockingQueue in which producers may wait for consumers to receive elements". In your case, threads A and B are producers, and thread T is the only consumer.

In a nutshell:

  1. create a shared TransferQueue<Object> synchronizer = new TransferQueue<>();
  2. Threads A, and B call the blocking method synchronizer.transfer(new Object());
  3. Meanwhile, Thread t calls synchronizer.take(); at its leisure to unblock.

Here's an example:

import java.util.concurrent.TransferQueue;

public class TransferQueueExample {
    public static void main(String[] args) {
        final TransferQueue<Object> synchronizer = new TransferQueue<>();

        for (int i = 0; i < 10; i++) {
            final Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    // Do work until
                    // ...
                    System.out.println("Thread " + i + " is transferring.");

                    synchronizer.transfer(new Object()); // This is blocking
                    System.out.println("Thread " + i + " done transferring.");
                }
            }).start();
        }

        final Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                // TODO: Note thatthis will run indefinitely so in a real
                // scenario, find a way of shutting this thread down.
                while (true) {
                    System.out.println("There are about " +
                                       synchronizer.getWaitingConsumerCount()
                                       + " threads waiting");
                    synchronizer.take();
                    System.sleep(1000);
                }
            }
        }).start();
    }
}

I hope the nesting and anonymous classes aren't too distracting.

于 2013-11-08T04:55:40.473 回答