3

给定一些使用流处理大量项目的代码,检测日志记录和性能/分析的各个步骤的最佳方法是什么?

实际示例:

  ReactiveSeq.fromStream(pairs)
                .filter(this::satisfiesThreshold)
                .filter(this::satisfiesPersistConditions)
                .map((pair) -> convertToResult(pair, jobId))
                .flatMap(Option::toJavaStream)
                .grouped(CHUNK_SIZE)
                .forEach((chunk) ->
                {
                    repository.save(chunk);
                    incrementAndReport();
                });
  reportProcessingTime();

记录进度很重要,因此我可以在另一个线程中触发更新用户界面的进度事件。

跟踪此流中过滤和映射步骤的性能特征是可取的,以查看可以在哪些地方进行优化以加快速度。

我看到三个选项:

  1. 将日志记录/分析代码放在每个函数中
  2. peek在每个步骤中使用而不实际使用该值
  3. 某种基于注释或 AOP 的解决方案(不知道是什么)

哪个是最好的?关于#3 的样子有什么想法吗?还有其他解决方案吗?

4

1 回答 1

2

您在这里有几个选择(如果我理解正确的话):-

  1. 我们可以使用 elapsed 运算符来跟踪元素发射之间的经过时间,例如

      ReactiveSeq.fromStream(Stream.of(1,2))
                 .filter(this::include)
                 .elapsed()
                 .map(this::logAndUnwrap)
    
      Long[] filterTimeTakenMillis = new Long[maxSize];
      int filterIndex = 0;
      private <T> T logAndUnwrap(Tuple2<T, Long> t) {
          //capture the elapsed time (t.v2) and then unwrap the tuple
          filterTimeTakenMillis[filterIndex++]=t.v2;
          return t.v1;
      }
    

这仅适用于 cyclops-react Streams。

  1. 我们可以在 FluentFunctions 中使用类似 AOP 的功能

例如

ReactiveSeq.fromStream(Stream.of(1,2))
                .filter(this::include)
                .elapsed()
                .map(this::logAndUnwrap)
                .map(FluentFunctions.of(this::convertToResult)
                                   .around(a->{

                                    SimpleTimer timer = new SimpleTimer();
                                    String r = a.proceed();
                                    mapTimeTakenNanos[mapIndex++]=timer.getElapsedNanos();
                                    return r;
                }));

这也适用于 vanilla Java 8 Streams。

于 2017-02-16T16:32:42.700 回答