我正在考虑替换一个看起来非常接近 ReactiveStreams 的本土日志处理库io.projectreactor
. 目标是减少我们维护的代码,并利用社区添加的任何新功能(关注运营商融合)。
作为开始,我需要使用标准输入输出并将多行日志条目合并到将沿管道向下流动的文本块中。Filebeat 文档的多行日志条目章节详细解释了用例(除了我们希望它在进程中)。
到目前为止,我拥有的代码是:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
当检测到新的日志头时,这会处理多行合并,但在现有库中,我们也会在超时后刷新累积的行(即,如果在 5 秒内没有收到文本,则刷新记录)。
在 Reactor 中对此进行建模的正确方法是什么?我需要编写自己的操作符,还是可以自定义任何现有的操作符?
任何指向在 Project Reactor 或 RxJava 中实现此用例的相关示例和文档的指针都将非常感激。