我正在使用火花流从 Kafka 读取数据并传递给 py 文件进行预测。它返回预测以及原始数据。它将原始数据及其预测保存到文件中,但是它为每个 RDD 创建了一个文件。我需要一个包含所有收集到的数据的文件,直到我停止将程序保存到一个文件中。
我试过 writeStream 它甚至不会创建一个文件。我尝试使用 append 将其保存到镶木地板,但它会为每个 RDD 创建多个文件,即 1 个文件。我尝试使用附加模式写入多个文件作为输出。下面的代码创建一个文件夹 output.csv 并将所有文件输入其中。
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder()
.appName("consumer")
.master("local[*]")
.getOrCreate()
val scc = new StreamingContext(ss.sparkContext, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer"->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer">
"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"-> "group5" // clients can take
)
mappedData.foreachRDD(
x =>
x.map(y =>
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
)
)
scc.start()
scc.awaitTermination()
我只需要获取 1 个文件,其中包含在流式传输时一一收集的所有语句。
任何帮助将不胜感激,谢谢您的期待。