0
// #atLeastOnceBatch
        Consumer.Control control =
            Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
                .mapAsync(1, msg ->
                    business(msg.record().key(), msg.record().value())
                            .thenApply(done -> msg.committableOffset())
                )
                .batch(
                    20,
                    ConsumerMessage::createCommittableOffsetBatch,
                    ConsumerMessage.CommittableOffsetBatch::updated
                )
                .mapAsync(3, c -> c.commitJavadsl())
                .to(Sink.ignore())
                .run(materializer);
        // #atLeastOnceBatch

我正在尝试测试驱动 Alpakka Kafka 连接器至少一次批处理示例,我收到以下编译时错误

ConsumerMessage 类型未定义此处适用的 createCommittableOffsetBatch(ConsumerMessage.CommittableOffset)

并且 ConsumerMessage.CommittableOffsetBatch 类型未定义适用于此处的 updated(S, ConsumerMessage.CommittableOffset)

4

1 回答 1

0

这些在 v 0.22 中可用。不幸的是,与 Akka Docs 相比,Alpakka 的文档缺乏一点。

于 2018-09-27T23:06:11.440 回答