9

我正在学习Reactor,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与某个实体相关联并包含一些数据。

interface Message {
    String getEntityId();
    Data getData();
}

与不同实体相关的消息可以并行处理。但是,与任何单个实体有关的消息必须一次处理一个,即实体的消息 2 的处理要"abc"等到实体的消息 1 的处理完成后才能开始"abc"。在处理消息时,应该缓冲该实体的进一步消息。其他实体的消息可以畅通无阻地进行。可以将其想象为每个实体都在线程上运行这样的代码:

public void run() {
    for (;;) {
        // Blocks until there's a message available
        Message msg = messageQueue.nextMessageFor(this.entityId);

        // Blocks until processing is finished
        processMessage(msg);
    }
}

如何在不阻塞的情况下使用 React 实现这一点?总消息率可能很高,但每个实体的消息率会非常低。实体集可能非常大,不一定事先知道。

我想它可能看起来像这样,但我不知道。

{
    incomingMessages()
            .groupBy(Message::getEntityId)
            .flatMap(entityStream -> entityStream
                    /* ... */
                    .map(msg -> /* process the message */)))
                    /* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }
4

2 回答 2

6

使用ProjectReactor,您可以通过以下方式解决它:

@Test
public void testMessages() {
    Flux.fromStream(incomingMessages())
            .groupBy(Message::getEntityId)
            .map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
            .subscribe( //create consumer for main stream
                    stream ->
                            stream.subscribe(this::processMessage) // create consumer for group stream
            );
}

public Stream<Message> incomingMessages() {
    return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}

public void processMessage(Message message) {
    System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}

private static class Message {
    private final int id;
    private final int entityId;

    public Message(int id, int entityId) {
        this.id = id;
        this.entityId = entityId;
    }

    public int getId() {
        return id;
    }

    public int getEntityId() {
        return entityId;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", entityId=" + entityId +
                '}';
    }
}

我认为类似的解决方案可能在RxJava

于 2017-01-25T09:11:51.067 回答
4

我们在项目中遇到了同样的问题。具有相同 id 的实体必须按顺序处理,但具有不同 id 的实体可以并行处理。

解决方案非常简单。我们开始使用 concatMap 而不是使用 flatMap。来自 concatMap 的文档:

 * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
 * then flatten these inner publishers into a single {@link Flux}, sequentially and
 * preserving order using concatenation.

代码示例:

public void receive(Flux<Data> data) {
    data
        .groupBy(Data::getPointID)
        .flatMap(service::process)
        .onErrorContinue(Logging::logError)
        .subscribe();

}

工艺方法:

Flux<SomeEntity> process(Flux<Data> dataFlux) {
    return dataFlux
        .doOnNext(Logging::logReceived)
        .concatMap(this::proceedDefinitionsSearch)
        .doOnNext(Logging::logDefSearch)
        .flatMap(this::processData)
        .doOnNext(Logging::logDataProcessed)
        .concatMap(repository::save)
        .doOnNext(Logging::logSavedEntity);
}
于 2019-03-23T14:39:22.290 回答