大多数发布订阅实现使用某种形式的迭代来通知订阅者,或者通过使用 for/while 循环调用他们的侦听器方法,或者通过将感兴趣的新主题放在每个订阅者的阻塞队列中。
我想实现一个发布订阅,而不是“监视”一些东西,即位置、变量或文件或其他东西,然后从其存储的位置中提取新主题。我希望他们都“大约”同时收到通知。
这是因为以前的解决方案很慢,因为我有大量的订阅者并且迭代它们需要时间。
有任何想法吗?
大多数发布订阅实现使用某种形式的迭代来通知订阅者,或者通过使用 for/while 循环调用他们的侦听器方法,或者通过将感兴趣的新主题放在每个订阅者的阻塞队列中。
我想实现一个发布订阅,而不是“监视”一些东西,即位置、变量或文件或其他东西,然后从其存储的位置中提取新主题。我希望他们都“大约”同时收到通知。
这是因为以前的解决方案很慢,因为我有大量的订阅者并且迭代它们需要时间。
有任何想法吗?
你可以这样做:
public class Producer {
public static Object publishedObject;
public static ConcurrentLinkedDeque<Object> previousObjects;
private int size;
public publishObject(Object object) {
previousObjects.offerLast(publishedObject);
publishedObject = object;
size++;
if(size > 50) {
previousObjects.removeFirst();
}
}
}
public class Consumer implements Runnable {
private Object lastConsumedObject;
public void run() {
while(true) {
if(Producer.publishedObject == lastConsumedObject) {
sleep(500);
} else {
Iterator<Object> iterator = Producer.descendingIterator();
Object next = iterator.next();
Object newLastConsumedObject = next;
do {
this.process(next);
} while(iterator.hasNext() &&
(next = iterator.next()) != lastConsumedObject);
lastConsumedObject = newLastConsumedObject;
}
}
}
}
这个想法是,它Producer
有一个可以轮询的“发布位置” Consumers
,并且它还将以前发布的对象存储在 aLinkedList
中,以防 aConsumer
没有注意到一个。消费者轮询发布位置并在没有更改时休眠,否则它会处理发布的对象并遍历先前发布的对象以防错过发布事件。
半忙碌的等待可能会扼杀你的表现。另一种选择是有两类消费者:SinkConsumer
和RelayConsumer
。RelayConsumer
每个硬件线程将有一个(例如RelayConsumers
,如果您有四个内核和超线程,则为 8 个)。将Publisher
消息发布到RelayConsumers
,然后每个人将消息发布到 numberOfSinkConsumers/numberOfRelayConsumers SinkConsumers
。