0

我做了一个生产者-消费者计划。它只是核心 java 中的一个程序,没有任何 GUI(Swing 或 SWT)。它有一个将对象放入队列的生产者。还有一些消费者必须将一些人员(例如字符串)添加到该共享队列中的每个对象中。因此,每个消费者都必须处理共享队列中的每个对象。在这种情况下 - 每个 BookShelf 都必须在“books”ArrayList 中包含来自所有消费者的项目。消费者。

问题:我应该在消费者中使用什么条件来正确完成他们的线程?以下是程序的代码片段。也许我以错误的方式实施它。

这是队列的对象:

public class BookShelf {
private int id;
private String name;
private int height;
private int weigh;
List<String> books = Collections.synchronizedList(new ArrayList<String>());

public BookShelf(int id, String name) {
    this.id = id;
    this.name = name;
}
public void addBook(String book) {
  books.add(book);
}
public boolean eq(String book) {
synchronized (books) {
    for (String b: books) {
    if (b.equalsIgnoreCase(book)) {
        return true;
    }
    }
}
return false;
}
 other setters and getters..

}

这是生产者类:

public class Producer implements Runnable {
private BlockingQueue myQueue;

public Producer(BlockingQueue myQueue) {
this.myQueue = myQueue;
}

public void run() {
for(int i=0; i<7; i++){
    try {
    System.out.println("Produced: " + i);
    BookShelf myBookShelf = new BookShelf(i, "book #" + i);
    myQueue.put(myBookShelf);
    } catch (InterruptedException ex) {
    //Proper handle
    }
}
}

}

这是消费者类之一:

 public class Consumer implements Runnable {
 private BlockingQueue myQueue;

public Consumer(BlockingQueue myQueue) {
    this.myQueue = myQueue; }

public void run() {
    while(true){
        try {
            BookShelf tempBookShelf = (BookShelf) myQueue.take();

            //eq() is my method to check if ArraList has a book.
            if (tempBookShelf.eq("Abc book")) {
                System.out.println("It already has book");
                myQueue.put(tempBookShelf);
                Thread.sleep(2000);
            } else {
                tempBookShelf.addBook("Abc book");
                myQueue.put(tempBookShelf);
                Thread.sleep(2000);
            }
        } catch (InterruptedException ex) {
            //Proper handle
        }
    }
}
}

这是主要课程:

public class ProducerConsumerTest {

public static void main(String[] args) {

     BlockingQueue sharedQueue = new LinkedBlockingQueue();
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));
     Thread consThread2 = new Thread(new Consumer2(sharedQueue));

     prodThread.start();
     consThread.start();
     consThread2.start();
}
 }
4

4 回答 4

1

向生产者注册每个消费者。每个消费者都有自己的队列,生产者将对象放入所有队列。然后每个消费者在对象的同一实例上进行处理。

    public interface Consumer{
        public void process(BookShelf bs);
    }

    public class Producer implements Runnable{
        private final List<Consumer> consumers = new CopyOnWriteArrayList<Consumer>(); // thread safe but not efficient with lots of changes

        public void register(Consumer c){
            consumers.add(c); // thread safe
        }

        public void run(){
            for(;;){
                BookShelf bs = generateBookShelfByWhateverMeans();
                for (Consumer c : consumers){
                    c.process(bs);
                }
            }
        }
    }

    public class BookShelfConsumer implements Runnable, Consumer{
        private final BlockingQueue<BookShelf> queue = new LinkedTransferQueue<BookShelf>(); // unbounded & thread safe

        public void process(BookShelf bs){
            queue.offer(bs); // non-blocking
        }

        public void run(){
            for(;;){
                BookShelf bs = queue.take(); // blocks until got object or interrupted 
                // catch InterruptedException 
                // do whatever this consumer is supposed to do with the object
            }
        }
    }
于 2013-04-13T09:57:27.567 回答
0

我建议在 Bookshelf 中使用ConcurrentLinkedQueue而不是同步列表 - 它是无锁的(不需要同步)并且可能会更有效。

To end your consumers, change their while(true) loops to while(!cancel) loops. Give each consumer a cancel boolean as an instance variable that initializes to false, and give them a cancel() method that sets cancel to true. Call cancel() on your consumers when you're done with them. If you will always be canceling all of your consumers at once (instead of selectively canceling some but not others), then you can use a static cancel instead of an instance cancel.

于 2013-04-13T13:49:15.813 回答
0

还有一些(N 个)消费者必须将一些人员(例如字符串)添加到该共享队列中的每个对象中

我假设您的意思是每个消费者都必须将他们的东西添加到每个进入队列的对象中。在那种情况下,这不是生产者-消费者问题,这更像是一个观察者可观察的问题。基本上,当一个新BookShelf的被创建时,这就是Observable。所有的观察者都应该被告知BookShelf并有机会添加他们自己的Book.

于 2013-04-13T09:35:40.010 回答
0

我会尝试改用SwingWorker。它有一个done()在完成时执行的方法。有关一些代码示例,请参阅页面。

如果你使用的不是 Swing,Swt 中有一个类似的功能叫做 Jobs。检查页面以获取示例。它还有一个在作业完成时执行的 done() 方法。

于 2013-04-13T08:24:08.057 回答