0

我正在尝试从 kafka 主题中读取,在翻滚的窗口上聚合一些数据并将其写入接收器(我一直在尝试使用 Kafka 和控制台)。

我看到的问题是

  • 发送数据和接收接收器上窗口的聚合记录之间的长时间延迟(应触发预期触发器后的几分钟)
  • 来自先前窗口聚合的重复记录出现在后续窗口中

为什么延迟这么长,我能做些什么来减少它?

为什么会显示以前窗口中的重复记录,如何删除它们?

随着窗口变短,延迟似乎特别糟糕——当我将窗口持续时间设置为 10 秒时,延迟时间为 3 分钟以上,当窗口持续时间设置为 60 秒时,延迟时间约为 2 分钟。

在最短的窗口时间下,我还看到记录被“捆绑”起来,因此当接收器接收到记录时,我一次收到几个窗口的记录。

在重复的聚合记录中,我确实将输出模式设置为完成,但我的理解是,如果触发器在其中多次触发,则记录应该只在当前窗口中重复,而我的不应该这样做。

我设置了与窗口时间和 10%(1 或 6 秒)的水印阈值相匹配的处理触发器,并且我知道如果我移除翻转窗口,流本身可以正常工作。

我明白为什么 spark 可能无法触发特定频率的触发器,但我认为 10 秒,当然 60 秒足以处理我正在测试的非常有限的数据量。

使用 60 秒翻转窗口和处理时间触发器发送数据的示例

  • 发送 6 个有效载荷
  • 等一下
  • 发送 1 个有效载荷
  • 稍等片刻
  • 发送 3 个有效载荷

(CreateTime 来自带有 --property print.timestamp=true 的 kafka-console-consumer)。这些在我期望触发器根据 CreateTime 时间戳和窗口触发后几分钟到达。

// First record fine
CreateTime:1644329432464        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}

// Duplicate of first record with second sent by spark even though the window is over
CreateTime:1644329505265        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644329523901        {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}

// Duplicate of first 2 records with third
CreateTime:1644330082974        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644330105990        {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
CreateTime:1644330125375        {"window":{"start":"2022-02-08T14:20:00.000Z","end":"2022-02-08T14:21:00.000Z"},"account_id":"acc0","totalAmount":333}

我有时确实会看到如下消息,但没有其他 WARN 或 ERROR 级别的消息表明存在问题:

2022-02-08 14:24:45 WARN  ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 99770 milliseconds

应用数据/代码

示例数据如下所示,由 ts 设置为当前时间的 python 脚本生成:

{"account_id": "acc0", "txn_id": "1234500001", "amount": 111, "ts": 1644258052800}

应用程序代码(嵌入 spark.master=local[*] 运行)

    public void execute() throws Exception {
        final SparkSession spark = SparkSession.builder().appName("test").getOrCreate();

        final Trigger trigger = Trigger.ProcessingTime(60000);
        final OutputMode outputMode = OutputMode.Complete();
        final String windowDuration = "60 seconds";
        final String watermarkThreshold = "6 seconds";

        final String kafkaHost = "localhost:9092";
        final String kafkaInTopic = "topic-in";
        final String kafkaOutTopic = "topic-out";

        final StructType schema = new StructType(
            new StructField[] {
                new StructField("account_id", DataTypes.StringType, false, null),
                new StructField("txn_id", DataTypes.StringType, false, null),
                new StructField("amount", DataTypes.LongType, false, null),
                new StructField("ts", DataTypes.LongType, false, null)
            }
        );

        final Dataset<Row> in = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaHost)
            .option("subscribe", kafkaInTopic)
            .option("startingOffsets", "latest")
            .option("includeHeaders", "true")
            .load();

        in.printSchema();

        final Dataset<Row> process = in
            // convert kafka value payload to structured data
            .withColumn("value", functions.from_json(functions.column("value").cast(DataTypes.StringType), schema))

            .withColumn("account_id", functions.column("value.account_id"))
            .withColumn("txn_id", functions.column("value.txn_id"))
            .withColumn("amount", functions.column("value.amount"))
            .withColumn("ts", functions.column("value.ts"))

            // conversion to timestamp is by second, not ms
            .withColumn("datetime", functions.col("ts").divide(1000).cast(DataTypes.TimestampType))
            .withWatermark("datetime", watermarkThreshold)

            .groupBy(
                functions.window(functions.col("datetime"), windowDuration),
                functions.col("account_id")
            )
            .agg(functions.sum("amount").as("totalAmount"));


        process.printSchema();

        final DataStreamWriter<Row> out = process
            // required for kafka output
            .select(functions.to_json(functions.struct("*")).as("value"))

            .writeStream()
            .outputMode(outputMode)
            .trigger(trigger)
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaHost)
            .option("topic", kafkaOutTopic)
            .option("checkpointLocation", "/tmp/spark-kafka");

        LOGGER.info("STARTING STREAM");
        out.start().awaitTermination();
    }

4

1 回答 1

1

对于长时间的延迟,很可能是由于没有足够的资源来根据警告消息处理消息造成的。您可以检查 spark UI 以了解原因。它可能是分区之间的数据倾斜或需要更多的内存或内核。

对于重复记录,您可能要尝试updateappend模式。Completemode 表示每次触发后将整个 Result Table 输出到 sink。这就是你有deplicates的原因。您可以参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

于 2022-02-09T14:49:54.533 回答