我正在尝试测试通过共享/热可观察对象发送的消息的延迟。我注意到,当我在单个共享 observable 上有多个观察者时,单个观察者会从单个消息中调用n次(其中n是共享 observable 上的观察者数量)。
我用10 个 Observers和1 Message Per Observer运行下面的代码,每个观察者每条消息被调用 10 次(意味着总共 100 个 observer.next() 调用)。根据我对观察者/可观察者的理解,每个观察者每条消息应该只被调用一次。我只是在share()
这里错误地使用了运算符吗?还是我对它的理解总体上存在缺陷?
const getMessageLatency = (observersCount, messagesPerObserver) => {
const completedMessages = [];
const source = new Subject();
const sharedObservable = source.pipe(
tap((message) => console.log(`Subject: Incoming for ${message.id}`)),
share()
);
// Setup observers
for (i = 0; i < observersCount; ++i) {
sharedObservable
.pipe(
tap((message) => console.log(`SharedObservable: Incoming for ${message.id}`)),
filter((message) => message.id === getObserverId(i)),
tap(() => console.log(`Filtered for ${getObserverId(i)}`))
)
.subscribe((message) => {
const date = new Date();
message.endTime = date.getMilliseconds();
completedMessages.push(message);
})
}
// send out messages
for (i = 0; i < observersCount; ++i) {
for (j = 0; j < messagesPerObserver; ++j) {
const date = new Date();
const message = {
id: getObserverId(i),
startTime: date.getMilliseconds()
}
// send message
source.next(message);
}
}
// process data (get average message latency)
const totalMessageLatency = completedMessages.reduce(
(accumulatedLatency, currentMessage) => {
const currentMessageLatency =
currentMessage.endTime - currentMessage.startTime;
return accumulatedLatency + currentMessageLatency;
}, 0);
const averageLatency = totalMessageLatency / completedMessages.length;
console.log("==============================================================================");
console.log(`Observers: ${observersCount}, MessagesPerObserver: ${messagesPerObserver}`);
console.log(`Total Messages Sent: ${observersCount * messagesPerObserver}`);
console.log(`Total Messages Received: ${completedMessages.length}`);
console.log(`Average Latency per Message: ${averageLatency}`);
console.log("==============================================================================");
return averageLatency;
}
运行完成后,如果“发送的消息总数”为x,则“接收的消息总数”将为x^2