0

我正在尝试基于分区对输入文件进行分区,accountId但仅当 dataFrames 包含超过 1000 条记录时才完成此分区。accountId是一个无法知道的动态整数。考虑下面的代码

    val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()

lines.foreachRDD { rdd =>
  val count = rdd.count()
  if (count > 0) {
    val df = sqlContext.read.json(rdd)
    val filteredDF = df.filter(df("accountId")==="3")
    if (filteredDF.count() > 1000) {
      df.write.partitionBy("accountId").format("json").save("output")
    }
  }
}

ssc.start()
ssc.awaitTermination()

但是上面的代码将所有不需要的 accountId 分区。

  1. 我想找到数据框中每个的计数accountId
  2. 如果每个 accountId 的记录超过 1000,则将分区信息写入输出源。

例如,如果输入文件有 1500 条 accountId=1 的记录和 10 条 accountId=2 的记录,则根据 accountId=1 将过滤后的数据帧划分为输出源,并将 accountId=2 记录保留在内存中。

如何使用火花流实现这一目标?

4

1 回答 1

1

你应该做

filteredDF.write.partitionBy("accountId").format("json").save("output")

代替

df.write.partitionBy("accountId").format("json").save("output")
于 2016-07-29T18:15:32.660 回答