我正在尝试使用 Jet 进行聚合,源和接收器是 Kafka 主题,要求是从源获取 GPB(google proto buf)消息并发布 GPB 消息。问题是我能够发布Double
但不能发布 GPB 消息,它给了我编译错误。
这工作正常:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>))
.map(s->s.getValue() ).groupingKey(x->x.account)
.rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));
即使上面的代码工作正常,它也会发布到接收主题,而我的要求是发布一个具有接收主题属性double
的 GPB 。当我尝试通过放一个beforedouble
来做到这一点时,它给了我语法错误。以下是我尝试过的:map
drainTo
.rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
.map(k->Amount.newBuilder().setAmount(k.getValue()).build())
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));
金额是具有double
属性的 GPB 消息。这给了我不理解的语法错误。你能帮我解决这个问题吗?
您能否分享一些文档或链接以及针对不同场景有不同聚合的地方?我浏览了 Hazelcast 示例、演示,但不是全部,但很少,但在那里没有找到我的用例。非常感谢。