1

我需要在从 Kafka 主题读取的 DataFrame 上应用多个过滤器,并将每个过滤器的输出发布到外部系统(如另一个 Kafka 主题)。

我读过这样的kafkaDF

val kafkaDF: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "try.kafka.stream")
  .load()
  .select(col("topic"), expr("cast(value as string) as message"))
  .filter(col("message").isNotNull &&  col("message") =!= "")
  .select(from_json(col("message"), eventsSchema).as("eventData"))
  .select("eventData.*")

我可以foreachBatch在此 Dataframe 上运行 a 然后遍历过滤器列表以获取过滤后的数据,然后可以将其发布到 kafka 主题,如下所示

kafkaDF.writeStream
  .foreachBatch { (batch: DataFrame, _: Long) =>
    // List of filters that needs to be applied
    filterList.par.foreach(filterString => {
      val filteredDF = batch.filter(filterString)
      // Add some columns. 
      // Do some operations based on different filter
      filteredDF.toJSON.foreach(value => {
        // Publish a message to Kafka 
      })
    })
  }
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start()
  .awaitTermination()

但是,考虑到这么多迭代,我不确定这是否是最好的方法。有没有比这样做更好的方法?

4

1 回答 1

1

如果您计划将数据从一个 Kafka 主题写入多个 Kafka 主题,您可以在写入 Kafka 时在单个Dataframe 中创建一个名为“topic”的列。然后,此列中的值定义将在其中生成记录的主题。这允许您根据需要写入尽可能多的不同 Kafka 主题。

因此,我只会将您的过滤器逻辑用作何时/否则条件,或者,如果更复杂,则用作 UDF。

下面是一个示例代码,可以帮助您入门。根据value消费的 Kafka 消息,在filteredDf. 如果value = 1然后数据帧记录被生成到名为“out1”的主题中,否则记录被生成到名为“out2”的主题中。

val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "try.kafka.stream")
  .option("failOnDataLoss", "false")
  .load()
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "partition", "offset", "timestamp")

val filteredDf = inputDf.withColumn("topic", when(filter, lit("out1")).otherwise(lit("out2")))


val query = filteredDf
  .select(
    col("key"),
    to_json(struct(col("*"))).alias("value"),
    col("topic"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("checkpointLocation", "/home/michael/sparkCheckpoint/1/")
  .start()

query.awaitTermination()

编辑:(我最初可能误解了你的问题)

如果您只是想找到一种应用多个过滤器的好方法,filterList您可以使用以下方法组合它们foldLeft

val filter1 = col("value") === 1
val filter2 = col("key") === 1
val filterList = List(filter1, filter2)
val filterAll = filterList.tail.foldLeft(filterList.head)((f1, f2) => f1.and(f2))

println(filterAll)
((value = 1) AND (key = 1))

然后应用于.filter(filterAll)您的数据框。

于 2021-04-28T21:00:19.190 回答