3

只是为了学习新的 Spark 结构数据流,我尝试过这样的实验,但不确定我是否对流功能做错了什么。

首先,我从静态开始,只使用 Spark 2.1.0 附带的简单文本 (csv) 文件:

val df = spark.read.format("csv").load(".../spark2/examples/src/main/resources/people.txt")
df.show()

我可以得到如此合理的输出(在 Zepplin 下)。

+-------+---+
|    _c0|_c1|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

并按照示例,我只是修改了代码以读取相同的文件并提供了架构

val userSchema = new StructType().add("name", "string").add("age", "integer")

val csvDF = spark
  .readStream
  .schema(userSchema)      // Specify schema of the csv files
  .format("csv")
  .load(".../spark2/examples/src/main/resources/people.csv") 

并且没有错误消息,所以我想将数据写入内存并使用以下代码查看结果:

val outStream = csvDF.writeStream
  .format("memory")
  .queryName("logs")
  .start()

sql("select * from logs").show(truncate = false)

但是,没有错误消息,我一直得到“空输出”

+----+---+
|name|age|
+----+---+
+----+---+

这些代码在 Zeppelin 0.7 下进行了测试,我不确定我是否在这里遗漏了什么。同时,我尝试了来自 Apache Spark 2.1.0 官方网站的示例,$nc -lk 9999它运行得非常好。

如果我做错了什么,我可以学习吗?

[修改和测试]

  1. 我尝试将相同的文件 people.txt 复制到一个 .../csv/ 文件夹下的 people1.csv peopele2.csv people3.csv
  2. val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv")
  3. csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()

我得到了这个:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|   name|count|
+-------+-----+
|Michael|    3|
|   Andy|    3|
| Justin|    3|
+-------+-----+

因此,我可能认为这不是数据readstream()问题...

4

2 回答 2

1
  1. The file name is people.txt, not people.csv. Spark will throw an error saying "Path does not exist". I just used Spark Shell to verify it.

  2. The input path should be a directory. It doesn't make sense to use a file since this is a streaming query.

于 2017-03-06T02:56:14.187 回答
0

您的代码有 2 个不同之处: 1. 非工作的具有“追加”(默认)输出模式,但工作的具有“完成”的输出模式。2. non-working 选择没有聚合的记录,但 working 有 groupBy 聚合。

我建议您切换到完整输出模式并进行 groupBy 计数以查看是否可以解决问题。

于 2017-03-28T12:45:12.033 回答