0

我从 messageHandler(kafka) 获取数据并通过 servicehandler 处理它给我一些结果

serviceHandler.process(message).getMessage().toString()

现在我需要输出这个结果流,这样每当一条新记录通过消息处理程序时,它就会得到处理并将输出推送到前端(角度)

@Autowired
MessageHandler messageHandler;
@Autowired
ServiceHandler serviceHandler;

public void runHandler() {
    Flux<Message> messages = messageHandler.flux();             
    messages.subscribeOn(Schedulers.parallel()) 
    .doOnNext(message -> serviceHandler.process(message).getMessage().toString())
    .subscribe();
}
public Flux pushresult(){ ???? }

有谁知道我需要什么的方法?

4

1 回答 1

0

看看 .map() 和 .flatMap()。在你的情况下,它看起来像 map() 但如果你调用的下一个服务也返回你想要的 Mono/Flux .flatMap() 或者你最终会得到类似的东西:

Flux<Flux<String>>

代替

Flux<String>

.doOnNext 用于记录日志等副作用。

于 2020-04-02T00:42:12.293 回答