1

我为 DAG 创建了以下示例代码以了解聚合。看来,slidingWindow顶点不发出任何记录。

不确定,这里出了什么问题..

public DAG buildDAG() {
    DAG dag = new DAG();
    SlidingWindowPolicy winPolicy = slidingWinPolicy(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS);

     Vertex source = dag.newVertex("source", SourceProcessors.streamRemoteMapP(getRemoteSourceName(),
              getClientConfig(), START_FROM_OLDEST,  WatermarkGenerationParams.noWatermarks()));


    Vertex slidingWindow = dag.newVertex("aggregate-to-sliding-win",
          aggregateToSlidingWindowP(
                  singletonList((v) ->  getUserID((Entry<String, CacheEntry<Record>>)v)),
                  singletonList((v) ->  getTimeStamp((Entry<String, CacheEntry<Record>>)v)),
                  TimestampKind.EVENT,
                  winPolicy,
                  counting(),
                  TimestampedEntry::new));


    Vertex peekOP = dag.newVertex("peekOP", DiagnosticProcessors.writeLoggerP());
    Vertex peekOP1 = dag.newVertex("peekOP1", DiagnosticProcessors.writeLoggerP());

    Vertex sink = dag.newVertex("sink", SinkProcessors.writeFileP("c:\\\\data\\\\op1.txt"));

  return dag
          .edge(between(source, peekOP))
          .edge(between(peekOP, slidingWindow))
          .edge(between(slidingWindow,peekOP1))
          .edge(between(peekOP1, sink));
}   

同样,我为 Pipeline API 创建了以下示例代码以进行聚合。

这很好用。这将打印文本文件中的记录。

private Pipeline buildPipeline() {

    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getClientConfig(), START_FROM_OLDEST))
      .addTimestamps((v) ->  getTimeStamp(v), 3000)
      .peek()
    .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    .drainTo(Sinks.files("c:\\data\\op.txt"));
    return p;
  }

请您帮我更正 DAG 定义吗?

4

1 回答 1

2

有多个问题:

  1. WatermarkGenerationParams.noWatermarks():要为窗口处理器提供任何输出,您需要水印。利用wmGenParams((v) -> getTimeStamp(v), limitingLag(3000), emitByFrame(winPolicy), -1)

  2. DiagnosticProcessors.writeLoggerP()是一个水槽。它接收项目但不发出任何项目。要查看顶点,请将处理器供应商包裹在peekInputP( /* original supplier */ )peekOutputP

  3. 边缘slidingWindow必须是distributedpartitioned。没有这些,您将得到结果,但结果不正确。

DAG API 旨在用于使用 Pipeline API 无法实现的高级用例。随着每个 Jet 版本的发布,使用 DAG API 的需求就会减少。正如您的示例所示,Pipeline API 更易于编写且更简洁。

于 2018-06-19T14:17:49.040 回答