我需要根据从主题收到的值执行一些功能。我目前正在使用 ForeachWriter 将所有主题转换为列表。现在,我想将此列表作为参数传递给方法。
这是我到目前为止所拥有的
def doA(mylist: List[String]) = { //something for A }
def doB(mylist: List[String]) = { //something for B }
Ans 这就是我如何称呼我的流式查询
//{"s":"a","v":"2"}
//{"s":"b","v":"3"}
val readTopics = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "myTopic").load()
val schema = new StructType()
.add("s",StringType)
.add("v",StringType)
val parseStringDF = readTopics.selectExpr("CAST(value AS STRING)")
val parseDF = parseStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
parseDF.writeStream
.format("console")
.outputMode("append")
.start()
//fails here
val listOfTopics = parseDF.select("s").map(row => (row.getString(0))).collect.toList
//unable to call the below methods
for (t <- listOfTopics ){
if(t == "a")
doA(listOfTopics)
else if (t == "b")
doB(listOfTopics)
else
println("do nothing")
}
spark.streams.awaitAnyTermination()
问题:
- 如何在流式作业中调用独立(非流式)方法?
- 我不能在这里使用 ForeachWriter,因为我想将 SparkSession 传递给方法,并且由于 SparkSession 不可序列化,所以我不能使用 ForeachWriter。并行调用方法 doA 和 doB 的替代方法是什么?