8

我正在考虑替换一个看起来非常接近 ReactiveStreams 的本土日志处理库io.projectreactor. 目标是减少我们维护的代码,并利用社区添加的任何新功能(关注运营商融合)。

作为开始,我需要使用标准输入输出并将多行日志条目合并到将沿管道向下流动的文本块中。Filebeat 文档的多行日志条目章节详细解释了用例(除了我们希望它在进程中)。

到目前为止,我拥有的代码是:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
          .subscribe();          

当检测到新的日志头时,这会处理多行合并,但在现有库中,我们也会在超时后刷新累积的行(即,如果在 5 秒内没有收到文本,则刷新记录)。

在 Reactor 中对此进行建模的正确方法是什么?我需要编写自己的操作符,还是可以自定义任何现有的操作符?

任何指向在 Project Reactor 或 RxJava 中实现此用例的相关示例和文档的指针都将非常感激。

4

2 回答 2

3

这取决于您如何识别每个缓冲区的开始和结束,因此以下 RxJava 2 代码旨在作为有关使用主源值打开和关闭缓冲区门的提示:

TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
        o.buffer(o.filter(v -> v.contains("Start")), 
                 v -> Flowable.merge(o.filter(w -> w.contains("End")), 
                                     Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();

印刷:

[Start, A, B, End]
[Start, C]
[Start, D, End]

它通过共享源来工作,该源publish允许重用来自上游的相同值,而无需同时运行多个源副本。开口由在线上“开始”字符串的检测控制。关闭由“End”字符串的检测或在宽限期后触发的计时器控制。

编辑:

如果“开始”也是下一批的指示符,您可以将“结束”检查替换为“开始”并更改缓冲区的内容,因为它将在前一个缓冲区中包含新标头,否则:

pp.publish(f)
.doOnNext(v -> {
    int s = v.size();
    if (s > 1 && v.get(s - 1).contains("Start")) {
        v.remove(s - 1);
    }
})
.subscribe(System.out::println);
于 2017-07-19T23:27:28.323 回答
1

buffer运算符对我来说似乎是最合适和最简单的解决方案。

它具有基于大小和时间的策略。你有日志,所以我认为,你可以将行数解释为缓冲区大小。

这里的例子 - 如何发射按 4 或 5 秒时间跨度分组的项目:

    Observable<String> lineReader = Observable.<String>create(subscriber -> {
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            for (String line = br.readLine(); line != null; line = br.readLine()) {
                subscriber.onNext(line);
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }).subscribeOn(Schedulers.newThread());

    lineReader
      .buffer(5, TimeUnit.SECONDS,4)
      .filter(lines -> !lines.isEmpty())
      .subscribe(System.out::println);
于 2017-07-19T12:03:07.523 回答