0

似乎 Apache Beam (2.29.0) Kafka 管道适用于流式传输,只有在尝试以批处理模式消费时可用的钩子很少。我正在尝试读取在具有开始和结束时间戳的时间范围内给出的丢失消息。很容易弄清楚如何从给定的开始时间开始。为此,我使用了 withStartReadTime 方法,如下所示:

PCollection<GenericRecord> input =
            p.apply("Read from Kafka", KafkaIO.<byte[], GenericRecord>read()
                             .withStartReadTime(Instant.ofEpochMilli(options.getStartTimestamp().get()))

上面的代码使用给定的时间戳调用生产者的 seek 方法,消费者将从等于或大于该起始时间戳的偏移量开始读取分区。

但是,要停止管道,有 3 种可能的方法: withMaxNumRecords 和 withMaxReadTime 和 withCheckStopReadingFn withMaxReadTime 使用持续时间,并且会在给定的时间内读取记录,因此它会停止,因此对于我的目的来说不是确定性的。withMaxNumRecords 我可以计算每个分区的记录数量,我想读取开始时间和结束时间的偏移量,并计算每个分区的记录增量并将它们全部加起来。但是,这也将是不确定的,因为在读取时无法保证消息已在所有分区中均匀读取。一个分区可能已通过目标偏移量读取,而另一个分区未达到目标偏移量。CheckStopReadingFn 的最后一个选项是一个不错的选择。我们目前使用它来暂停管道,但不是基于偏移量。问题在于传递给函数的参数。这是该函数的示例实现。

    private static class CheckStopReadingFn implements  SerializableFunction<TopicPartition, Boolean>{

    @Override
    public Boolean apply(TopicPartition input) {
        LOGGER.info("Checking topic: {} partition: {}",input.topic(),input.partition());
        return Boolean.FALSE;
    }
}

此方法不采用正在处理的当前偏移量或最后处理的偏移量。因此,当达到给定的偏移量/时间戳时,我不能使用它来停止从分区读取。如果给出这两个偏移量/时间戳,那么实现停止将非常容易。我显然将 KafkaUnboundedReader 用于管道。我看到了对 BoundedReader 的引用,但是我不知道 Kafka 是否支持它以及在我的情况下如何使用它。

我对 Beam 中缺乏对批处理模式的支持感到困惑。我来自 Spark 世界,这类问题会有无数的解决方案。我不知道我是否缺少其他一些 API,或者是否有比 2.29 更高的版本以及我正在寻找的选项。如果有人指出我这样的解决方案,我将不胜感激。

4

0 回答 0