1

我想使用 Java 实现各种发布者/订阅者模式,但目前没有想法。

有 1 个发布者和 N 个订阅者,发布者发布对象然后每个订阅者需要以正确的顺序处理每个对象一次且仅一次。发布者和每个订阅者都在自己的线程中运行。

在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列中。这可以正常工作,但如果任何订阅者的队列已满,发布者将被阻止。这会导致性能下降,因为每个订阅者在处理对象时花费不同的时间。

然后在另一个实现中,发布者将对象保存在自己的队列中。与该对象一起,AtomicInteger 计数器与它与那里的订阅者数量相关联。然后每个订阅者查看队列并减少计数器,并在计数器达到零时将其从队列中删除。

通过这种方式,发布者不会阻塞,但现在订阅者需要等待对方处理对象,从队列中删除对象,然后才能查看下一个对象。

有没有更好的方法来做到这一点?我认为这应该是一个很常见的模式。

4

3 回答 3

0

有 1 个发布者和 N 个订阅者,发布者发布对象然后每个订阅者需要以正确的顺序处理每个对象一次且仅一次。发布者和每个订阅者都在自己的线程中运行。

我会改变这个架构。我最初考虑了每个订阅者的队列,但我不喜欢这种机制。例如,如果第一个订阅者需要更长的时间来运行,那么所有作业都将最终进入该队列,并且您将只执行 1 个工作线程。

由于您必须按顺序运行订阅者,我将有一个线程池,通过所有订阅者运行每条消息。对订阅者的调用将需要可重入,这可能是不可能的。

因此,您将拥有一个由 10 个线程组成的池(比方说),每个线程都从发布者的队列中出列,并执行以下操作:

public void run() {
    while (!shutdown && !Thread.currentThread().isInterrupted()) {
        Article article = publisherQueue.take();
        for (Subscriber subscriber : subscriberList) {
           subscriber.process(article);
        }
    }
}
于 2013-05-31T17:42:38.270 回答
0

绝对

每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列中。

这是要走的路。您可以使用线程方法将其放入队列中...因此,如果一个队列已满,发布者将不会等待..

例如。

s1 s2 s3 是订阅者,addToQueue是每个订阅者中添加到相应队列的方法。该addQueue方法等待队列非空..所以调用addQueue将是一个阻塞调用ideally synchronised code...

然后在发布者中,您可以执行类似于以下代码的操作

注意:代码可能无法正常工作..但应该会给您一些想法。

List<subscriber> slist;// Assume its initialised
public void publish(final String message){

    for (final subscriber s: slist){


          Thread t=new Thread(new Runnable(){
             public void run(){
                s.addToQueue(message);
             }
           });

      t.start();
    }

}
于 2013-05-31T16:46:43.343 回答
0

您的“多队列”实现是要走的路。我认为您不必担心一个完整的队列会阻塞生产者,因为完成的总时间不会受到影响。假设您有三个消费者,两个以每秒 1 个的速率消费,第三个以每五秒 1 个的速率消费,同时生产者以每两秒 1 个的速率生产。最终第三个队列将被填满,因此生产者将阻止它并且也将停止将项目放入第一个和第二个队列中。有一些方法可以解决这个问题,但它们不会改变第三个消费者永远存在的事实成为瓶颈。如果您正在生产/消费 100 件商品,那么这将至少需要 500 秒,因为第三个消费者(5 秒乘以 100 件商品),即使第一个和第二个消费者在 200 秒后完成也是如此(因为你已经做了一些聪明的事情,让生产者即使在第三个队列已满之后也能继续填充他们的队列)或者如果他们在 500 秒后完成(因为生产者在第三个队列上被阻塞)。

于 2013-05-31T16:33:29.957 回答