3

我正在寻找一种方法来触发我的 Databricks 笔记本一次以处理 Kinesis Stream 并使用以下模式

 import org.apache.spark.sql.streaming.Trigger

// Load your Streaming DataFrame
   val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
   sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")

使用 AWS Kinesis 似乎不可能,这也是 Databricks 文档所建议的。我的问题是我们还能做些什么来实现这一目标?

4

2 回答 2

2

正如您在问题中提到的, Kinesis 不支持触​​发器一次

但是您可以通过将Kinesis Data Firehose添加到图片中来实现您的需求 ,它将将来自 Kinesis 的数据写入 S3 存储桶(您可以选择您需要的格式,例如 Parquet、ORC,或者只保留 JSON),然后您可以将流式作业指向给定的存储桶,并为其使用 Trigger.Once,因为它是一个普通的流式源(为了提高效率,最好使用Databricks 上提供的Auto Loader )。此外,为了控制成本,您可以为 S3 目标设置保留策略,以便在一段时间后删除或存档文件,例如 1 周或一个月。

于 2021-04-03T10:53:13.850 回答
1

一种解决方法是在 X 运行后停止,无需触发。它将保证每次运行的行数固定。唯一的问题是,如果您有数百万行在队列中等待,您将无法保证处理所有行

在 scala 中,您可以添加一个事件侦听器,在 python 中计算批次数。

from time import sleep
s = sdf.writeStream.format("delta").start("/out/path")

#by defaut keep spark.sql.streaming.numRecentProgressUpdates=100 in the list. Stop after 10 microbatch
#maxRecordsPerFetch is 10 000 by default, so we will consume a max value of 10x10 000= 100 000 messages per run
while len(s.recentProgress) < 10:
  print("Batchs #:"+str(len(s.recentProgress)))
  sleep(10)
s.stop()

您可以使用更高级的逻辑来计算每批处理的消息数量并在队列为空时停止(一旦全部消耗,吞吐量应该会降低,因为您只会获得“实时”流,而不是历史记录)

于 2021-07-23T11:11:45.280 回答