我正在尝试基于分区对输入文件进行分区,accountId
但仅当 dataFrames 包含超过 1000 条记录时才完成此分区。accountId
是一个无法知道的动态整数。考虑下面的代码
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val df = sqlContext.read.json(rdd)
val filteredDF = df.filter(df("accountId")==="3")
if (filteredDF.count() > 1000) {
df.write.partitionBy("accountId").format("json").save("output")
}
}
}
ssc.start()
ssc.awaitTermination()
但是上面的代码将所有不需要的 accountId 分区。
- 我想找到数据框中每个的计数
accountId
。 - 如果每个 accountId 的记录超过 1000,则将分区信息写入输出源。
例如,如果输入文件有 1500 条 accountId=1 的记录和 10 条 accountId=2 的记录,则根据 accountId=1 将过滤后的数据帧划分为输出源,并将 accountId=2 记录保留在内存中。
如何使用火花流实现这一目标?