2

我正在使用火花流从 Kafka 读取数据并传递给 py 文件进行预测。它返回预测以及原始数据。它将原始数据及其预测保存到文件中,但是它为每个 RDD 创建了一个文件。我需要一个包含所有收集到的数据的文件,直到我停止将程序保存到一个文件中。

我试过 writeStream 它甚至不会创建一个文件。我尝试使用 append 将其保存到镶木地板,但它会为每个 RDD 创建多个文件,即 1 个文件。我尝试使用附加模式写入多个文件作为输出。下面的代码创建一个文件夹 output.csv 并将所有文件输入其中。

 def main(args: Array[String]): Unit = {
    val ss = SparkSession.builder()
      .appName("consumer")
      .master("local[*]")
      .getOrCreate()

    val scc = new StreamingContext(ss.sparkContext, Seconds(2))


    val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer"-> 
"org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer"> 
"org.apache.kafka.common.serialization.StringDeserializer",
        "group.id"-> "group5" // clients can take
      )
mappedData.foreachRDD(
      x =>
    x.map(y =>       
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
          )
    )
scc.start()
scc.awaitTermination()

我只需要获取 1 个文件,其中包含在流式传输时一一收集的所有语句。

任何帮助将不胜感激,谢谢您的期待。

4

2 回答 2

5

hdfs 中的任何文件一旦写入就不能修改。如果您希望实时写入文件(每 2 秒将来自流式作业的数据块附加到同一文件中),则根本不允许这样做,因为 hdfs 文件是不可变的。如果可能,我建议您尝试编写从多个文件读取的读取逻辑。

但是,如果您必须从单个文件中读取,我建议您使用两种方法之一,在您将输出写入单个 csv/parquet 文件夹后,使用“Append”SaveMode(它将为您编写的每个块创建部分文件2 秒)。

  1. 您可以在此文件夹顶部创建一个配置单元表,从该表中读取数据。
  2. 您可以在 spark 中编写一个简单的逻辑来读取包含多个文件的文件夹,然后使用 reparation(1) 或 coalesce(1) 将其作为单个文件写入另一个 hdfs 位置,然后从该位置读取数据。见下文:

    spark.read.csv("oldLocation").coalesce(1).write.csv("newLocation")
    
于 2019-08-19T07:19:00.933 回答
1

repartition - 建议在增加分区数的同时使用 repartition,因为它涉及所有数据的洗牌。

合并 - 建议在减少分区数量的同时使用合并。例如,如果您有 3 个分区并且您想将其减少到 2 个分区,Coalesce 会将第 3 个分区的数据移动到分区 1 和 2。分区 1 和 2 将保留在同一个 Container 中。但是重新分区将在所有分区中打乱数据,以便网络使用执行者之间的关系会很高,它会影响性能。

性能方面比重新分区更好地合并性能,同时减少分区数量。

因此,在编写使用选项作为合并时。例如:df.write.coalesce

于 2019-08-19T11:49:42.693 回答