我正在学习Reactor,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与某个实体相关联并包含一些数据。
interface Message {
String getEntityId();
Data getData();
}
与不同实体相关的消息可以并行处理。但是,与任何单个实体有关的消息必须一次处理一个,即实体的消息 2 的处理要"abc"
等到实体的消息 1 的处理完成后才能开始"abc"
。在处理消息时,应该缓冲该实体的进一步消息。其他实体的消息可以畅通无阻地进行。可以将其想象为每个实体都在线程上运行这样的代码:
public void run() {
for (;;) {
// Blocks until there's a message available
Message msg = messageQueue.nextMessageFor(this.entityId);
// Blocks until processing is finished
processMessage(msg);
}
}
如何在不阻塞的情况下使用 React 实现这一点?总消息率可能很高,但每个实体的消息率会非常低。实体集可能非常大,不一定事先知道。
我想它可能看起来像这样,但我不知道。
{
incomingMessages()
.groupBy(Message::getEntityId)
.flatMap(entityStream -> entityStream
/* ... */
.map(msg -> /* process the message */)))
/* ... */
}
public static Stream<Message> incomingMessages() { /* ... */ }