我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。
检查点在 S3 上提交:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
偏移量通过以下方式提交给 Kafka StreamingQueryListener
:
kafkaConsumer.commitSync(topicPartitionMap);
应用程序启动后,它会从 Kafka 检索偏移图并启动流:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
我将topic/partition/offset
数据存储在 ORC 文件中。
数据包含具有精确事件的多个副本topic/partition/offset
。
应该如何配置流以实现一次处理?