我正在尝试从 kafka 主题中读取,在翻滚的窗口上聚合一些数据并将其写入接收器(我一直在尝试使用 Kafka 和控制台)。
我看到的问题是
- 发送数据和接收接收器上窗口的聚合记录之间的长时间延迟(应触发预期触发器后的几分钟)
- 来自先前窗口聚合的重复记录出现在后续窗口中
为什么延迟这么长,我能做些什么来减少它?
为什么会显示以前窗口中的重复记录,如何删除它们?
随着窗口变短,延迟似乎特别糟糕——当我将窗口持续时间设置为 10 秒时,延迟时间为 3 分钟以上,当窗口持续时间设置为 60 秒时,延迟时间约为 2 分钟。
在最短的窗口时间下,我还看到记录被“捆绑”起来,因此当接收器接收到记录时,我一次收到几个窗口的记录。
在重复的聚合记录中,我确实将输出模式设置为完成,但我的理解是,如果触发器在其中多次触发,则记录应该只在当前窗口中重复,而我的不应该这样做。
我设置了与窗口时间和 10%(1 或 6 秒)的水印阈值相匹配的处理触发器,并且我知道如果我移除翻转窗口,流本身可以正常工作。
我明白为什么 spark 可能无法触发特定频率的触发器,但我认为 10 秒,当然 60 秒足以处理我正在测试的非常有限的数据量。
使用 60 秒翻转窗口和处理时间触发器发送数据的示例
- 发送 6 个有效载荷
- 等一下
- 发送 1 个有效载荷
- 稍等片刻
- 发送 3 个有效载荷
(CreateTime 来自带有 --property print.timestamp=true 的 kafka-console-consumer)。这些在我期望触发器根据 CreateTime 时间戳和窗口触发后几分钟到达。
// First record fine
CreateTime:1644329432464 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
// Duplicate of first record with second sent by spark even though the window is over
CreateTime:1644329505265 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644329523901 {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
// Duplicate of first 2 records with third
CreateTime:1644330082974 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644330105990 {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
CreateTime:1644330125375 {"window":{"start":"2022-02-08T14:20:00.000Z","end":"2022-02-08T14:21:00.000Z"},"account_id":"acc0","totalAmount":333}
我有时确实会看到如下消息,但没有其他 WARN 或 ERROR 级别的消息表明存在问题:
2022-02-08 14:24:45 WARN ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 99770 milliseconds
应用数据/代码
示例数据如下所示,由 ts 设置为当前时间的 python 脚本生成:
{"account_id": "acc0", "txn_id": "1234500001", "amount": 111, "ts": 1644258052800}
应用程序代码(嵌入 spark.master=local[*] 运行)
public void execute() throws Exception {
final SparkSession spark = SparkSession.builder().appName("test").getOrCreate();
final Trigger trigger = Trigger.ProcessingTime(60000);
final OutputMode outputMode = OutputMode.Complete();
final String windowDuration = "60 seconds";
final String watermarkThreshold = "6 seconds";
final String kafkaHost = "localhost:9092";
final String kafkaInTopic = "topic-in";
final String kafkaOutTopic = "topic-out";
final StructType schema = new StructType(
new StructField[] {
new StructField("account_id", DataTypes.StringType, false, null),
new StructField("txn_id", DataTypes.StringType, false, null),
new StructField("amount", DataTypes.LongType, false, null),
new StructField("ts", DataTypes.LongType, false, null)
}
);
final Dataset<Row> in = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaHost)
.option("subscribe", kafkaInTopic)
.option("startingOffsets", "latest")
.option("includeHeaders", "true")
.load();
in.printSchema();
final Dataset<Row> process = in
// convert kafka value payload to structured data
.withColumn("value", functions.from_json(functions.column("value").cast(DataTypes.StringType), schema))
.withColumn("account_id", functions.column("value.account_id"))
.withColumn("txn_id", functions.column("value.txn_id"))
.withColumn("amount", functions.column("value.amount"))
.withColumn("ts", functions.column("value.ts"))
// conversion to timestamp is by second, not ms
.withColumn("datetime", functions.col("ts").divide(1000).cast(DataTypes.TimestampType))
.withWatermark("datetime", watermarkThreshold)
.groupBy(
functions.window(functions.col("datetime"), windowDuration),
functions.col("account_id")
)
.agg(functions.sum("amount").as("totalAmount"));
process.printSchema();
final DataStreamWriter<Row> out = process
// required for kafka output
.select(functions.to_json(functions.struct("*")).as("value"))
.writeStream()
.outputMode(outputMode)
.trigger(trigger)
.format("kafka")
.option("kafka.bootstrap.servers", kafkaHost)
.option("topic", kafkaOutTopic)
.option("checkpointLocation", "/tmp/spark-kafka");
LOGGER.info("STARTING STREAM");
out.start().awaitTermination();
}