6

我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。

检查点在 S3 上提交:

DataFrameWriter<Row> writer = input.writeStream()
           .format("orc")
           .trigger(ProcessingTime(config.getProcessingTime()))
           .outputMode(OutputMode.Append())
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");

偏移量通过以下方式提交给 Kafka StreamingQueryListener

  kafkaConsumer.commitSync(topicPartitionMap);

应用程序启动后,它会从 Kafka 检索偏移图并启动流:

 reader = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)

我将topic/partition/offset数据存储在 ORC 文件中。

数据包含具有精确事件的多个副本topic/partition/offset

应该如何配置流以实现一次处理?

4

1 回答 1

5

发现这些参数应该设置为true spark.streaming.driver.writeAheadLog.closeFileAfterWritespark.streaming.receiver.writeAheadLog.closeFileAfterWrite

当您想将 S3 用于元数据 WAL 时,将此设置为“true”

https://spark.apache.org/docs/latest/configuration.html

更多细节在这里: https ://www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=IwAR17x1AfTLH1pjq1QPkDsQT6DU4hgi7WNeIYUnw25Hvquoj-4yQU10R0GeM

于 2019-03-28T11:19:53.817 回答