我对 Spark Streaming API 有以下问题。我目前正在通过 Flume 将输入数据流式传输到 Spark Streaming,我计划使用它对数据进行一些预处理。然后,我想将数据保存到 Hadoop 的文件系统并使用 Impala 进行查询。但是,Spark 将数据文件写入单独的目录,并为每个 RDD 生成一个新目录。
这是一个问题,因为首先,Impala 中的外部表无法检测子目录,而只能检测它们指向的目录内的文件,除非分区。其次,Spark 添加新目录的速度如此之快,以至于在 Impala 中为每个生成的目录定期创建一个新分区对性能非常不利。另一方面,如果我选择增加 Spark 中写入的滚动间隔,以降低生成目录的频率,则会增加延迟,直到 Impala 可以读取传入的数据。这是不可接受的,因为我的系统必须支持实时应用程序。在 Hive 中,我可以使用以下设置配置外部表以检测子目录而无需分区:
set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;
但据我了解,Impala 没有这样的功能。
我目前正在使用以下代码从 Flume 读取数据并将其写入 HDFS:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)
这里,变量 path 决定了目录的前缀,文本文件(part-0000 等)被添加到该目录,目录名的其余部分是 Spark 生成的时间戳。我可以将代码更改为以下内容:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))
在这种情况下,文件将被添加到由路径确定的同一目录中,但由于它们始终命名为 part-00000、part-00001、part-00002 等,因此将覆盖先前生成的文件。在查看 Spark 的源代码时,我注意到文件的名称是由 SparkHadoopWriter 的 open() 方法中的一行确定的:
val outputName = "part-" + numfmt.format(splitID)
在我看来,没有办法通过 Spark API 操作 splitID。总而言之,我的问题如下:
- 有什么方法可以让 Impala 中的外部表检测子目录?
- 如果没有,是否有任何方法可以让 Spark 将其输出文件写入单个目录或以 Impala 立即读取的形式?
- 如果没有,Spark 是否有任何类型的更新来解决这个问题,或者我应该只分支我自己的 Spark 版本,我可以用它来决定它自己编写的文件的名称?