0

我收到 Pipeline customTransformAPI 的以下编译错误。

这是构建管道的示例代码:

private Pipeline buildPipeline2() {
    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getClientConfig(), START_FROM_OLDEST))
          .addTimestamps((v) ->  getTimeStamp(v), 3000)
          .peek()
          .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
    //.aggregate(counting(),(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), formatKey(key), result))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    .customTransform("test2", ()-> this)
    .drainTo(Sinks.map("Test"));
    //.drainTo(Sinks.files("c:\\data\\op.txt"));
    return p;
  }

这是tryProcess()方法的示例代码:

protected boolean tryProcess(int ordinal, Object item) {
    JetEvent jetEvent = (JetEvent)item;
    Object obj = jetEvent.payload();
    tryEmit(ordinal,item);
    return true;
}

这是编译错误。

incompatible types: inferred type does not conform to upper bound(s)
[ERROR] inferred: java.lang.Object
[ERROR] upper bound(s): java.util.Map.Entry

这可以使用以下代码编译并运行良好。

 .customTransform("test2", ()-> this)
 .drainTo(Sinks.files("c:\\data\\op.txt"));

但是,下面的代码给出了编译错误。

.customTransform("test2", ()-> this)
.drainTo(Sinks.map("Test"));

你能帮我解决这个问题吗?

4

1 回答 1

0

customTransform不是类型安全的。如果无法推断类型参数,它将评估为Object. 但是,Sinks.map需要Map.Entry<K, V>. 要解决它,请在方法中添加类型提示customTransform

    .<Map.Entry<YourKeyType, YourValueType>customTransform("test2", ()-> this)
    .drainTo(Sinks.map("Test"));

请记住,如果您的自定义处理器实际上没有返回Map.Entry,它将在运行时失败。

作品,Sinks.files因为它需要Object

于 2018-06-22T07:13:12.443 回答