1

大多数发布订阅实现使用某种形式的迭代来通知订阅者,或者通过使用 for/while 循环调用他们的侦听器方法,或者通过将感兴趣的新主题放在每个订阅者的阻塞队列中。

我想实现一个发布订阅,而不是“监视”一些东西,即位置、变量或文件或其他东西,然后从其存储的位置中提取新主题。我希望他们都“大约”同时收到通知。

这是因为以前的解决方案很慢,因为我有大量的订阅者并且迭代它们需要时间。

有任何想法吗?

4

1 回答 1

1

你可以这样做:

public class Producer {
    public static Object publishedObject;
    public static ConcurrentLinkedDeque<Object> previousObjects;
    private int size;

    public publishObject(Object object) {
        previousObjects.offerLast(publishedObject);
        publishedObject = object;
        size++;
        if(size > 50) {
            previousObjects.removeFirst();
        }
    }
}

public class Consumer implements Runnable {
    private Object lastConsumedObject;
    public void run() {
        while(true) {
            if(Producer.publishedObject == lastConsumedObject) {
                sleep(500);
            } else {
                Iterator<Object> iterator = Producer.descendingIterator();
                Object next = iterator.next();
                Object newLastConsumedObject = next;
                do {
                    this.process(next);
                } while(iterator.hasNext() && 
                    (next = iterator.next()) != lastConsumedObject);
                lastConsumedObject = newLastConsumedObject;   
            }
        }
    }
}

这个想法是,它Producer有一个可以轮询的“发布位置” Consumers,并且它还将以前发布的对象存储在 aLinkedList中,以防 aConsumer没有注意到一个。消费者轮询发布位置并在没有更改时休眠,否则它会处理发布的对象并遍历先前发布的对象以防错过发布事件。

半忙碌的等待可能会扼杀你的表现。另一种选择是有两类消费者:SinkConsumerRelayConsumerRelayConsumer每个硬件线程将有一个(例如RelayConsumers,如果您有四个内核和超线程,则为 8 个)。将Publisher消息发布到RelayConsumers,然后每个人将消息发布到 numberOfSinkConsumers/numberOfRelayConsumers SinkConsumers

于 2013-08-14T14:29:49.177 回答