我需要在从 Kafka 主题读取的 DataFrame 上应用多个过滤器,并将每个过滤器的输出发布到外部系统(如另一个 Kafka 主题)。
我读过这样的kafkaDF
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "try.kafka.stream")
.load()
.select(col("topic"), expr("cast(value as string) as message"))
.filter(col("message").isNotNull && col("message") =!= "")
.select(from_json(col("message"), eventsSchema).as("eventData"))
.select("eventData.*")
我可以foreachBatch
在此 Dataframe 上运行 a 然后遍历过滤器列表以获取过滤后的数据,然后可以将其发布到 kafka 主题,如下所示
kafkaDF.writeStream
.foreachBatch { (batch: DataFrame, _: Long) =>
// List of filters that needs to be applied
filterList.par.foreach(filterString => {
val filteredDF = batch.filter(filterString)
// Add some columns.
// Do some operations based on different filter
filteredDF.toJSON.foreach(value => {
// Publish a message to Kafka
})
})
}
.trigger(Trigger.ProcessingTime("60 seconds"))
.start()
.awaitTermination()
但是,考虑到这么多迭代,我不确定这是否是最好的方法。有没有比这样做更好的方法?