0

我正在尝试使用 Jet 进行聚合,源和接收器是 Kafka 主题,要求是从源获取 GPB(google proto buf)消息并发布 GPB 消息。问题是我能够发布Double但不能发布 GPB 消息,它给了我编译错误。

这工作正常:

    Pipeline p = Pipeline.create();
    p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>)) 
    .map(s->s.getValue() ).groupingKey(x->x.account)
    .rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

即使上面的代码工作正常,它也会发布到接收主题,而我的要求是发布一个具有接收主题属性double的 GPB 。当我尝试通过放一个beforedouble来做到这一点时,它给了我语法错误。以下是我尝试过的:mapdrainTo

    .rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
    .map(k->Amount.newBuilder().setAmount(k.getValue()).build())
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

金额是具有double属性的 GPB 消息。这给了我不理解的语法错误。你能帮我解决这个问题吗?

您能否分享一些文档或链接以及针对不同场景有不同聚合的地方?我浏览了 Hazelcast 示例、演示,但不是全部,但很少,但在那里没有找到我的用例。非常感谢。

4

1 回答 1

0

我猜语法错误是这样的:

不兼容的类型。所需的接收器<? super Amount> 但 'kafka' 被推断为 Sink<Entry<K, V>>:不存在类型变量 K、V 的实例,因此 String 符合 Entry<K, V>

(下次请分享异常,你的代码对非共享类有依赖,我无法编译。)

这意味着 Kafka 接收器期望java.util.Map.Entry输入,但你给了它Amount。你需要map这样:

.map(entry-> Util.entry(entry.getKey(), Amount.newBuilder().setAmount(entry.getValue()).build()))
于 2019-02-08T13:31:32.387 回答