3

我正在研究将 kafka 偏移量存储在 kafka 内部以用于 Spark Structured Streaming,就像它适用于 DStreamsstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)一样,我正在寻找相同的东西,但适用于 Structured Streaming。是否支持结构化流式传输?如果是,我该如何实现?

我知道使用 的 hdfs 检查点.option("checkpointLocation", checkpointLocation),但我对内置偏移管理完全感兴趣。

我期望 kafka 仅在没有 spark hdfs 检查点的情况下将偏移量存储在内部。

4

2 回答 2

0

我正在使用在某处找到的这段代码。

public class OffsetManager {

    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
    }

}

SaveOffset...在记录处理成功后调用,否则不存储偏移量。并且我使用 Kafka 主题作为源,因此我将起始偏移量指定为从 ReadOffsets 检索到的偏移量......

于 2019-06-13T02:50:14.313 回答
0

“它支持结构化流吗?”

不,结构化流不支持将偏移量提交回 Kafka,类似于使用 Spark Streaming (DStreams) 可以完成的操作。关于Kafka 特定配置的 Spark Structured Streaming + Kafka 集成指南对此非常准确:

“卡夫卡源不提交任何偏移量。”

我在How to manual set groupId and commit Kafka offsets in Spark Structured Streaming中写了一个更全面的答案。

于 2021-01-21T16:23:12.173 回答