2

我正在测试新版本的 Apache Spark 2.0,尝试利用结构化流功能,使用非常简单的代码创建带有流数据的数据集,然后打印创建的数据集。这是我的代码:

    SparkSession mySession= SparkSession.builder().appName("ProcessData").master("local[*]").config("spark.sql.warehouse.dir","System.getProperty(\"user.dir\")/warehouse").getOrCreate();
    Dataset<Row> measurements=mySession.readStream().format("socket").option("host","localhost").option("port",5556).load();
    StreamingQuery printDataset=measurements.writeStream().format("console").start();
    printDataset.awaitTermination();

问题是我得到一个 IOException: mkdir of (temporary directory) failed。有人可以帮我解决这个问题吗?太感谢了。

这是显示的完整错误:

Exception in thread "main" java.io.IOException: mkdir of C:/Users/Manuel%20Mourato/AppData/Local/Temp/temporary-891579db-0442-4e1c-8642-d41c7885ab26/offsets failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1065)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:281)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:57)
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:131)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:251)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:231)
4

3 回答 3

0

确保您还在配置中设置了自己的检查点目录,该目录具有写入权限,一种方法是在相同的应用程序代码中创建检查点目录,例如,

.config("spark.sql.streaming.checkpointLocation", "C:\\sparkApp\\checkpoints\\")
于 2017-07-28T07:15:51.680 回答
0

可以试试这种方式吗?

SparkSession mySession= SparkSession.builder().appName("ProcessData").master("local[*]").config("spark.sql.warehouse.dir",System.getProperty("user.dir") + "/warehouse").getOrCreate();
于 2016-09-29T12:28:27.743 回答
0

为什么在字符串中使用 System.getProperty?另外,请检查此类文件夹是否存在,即:

val tempDir = System.getProperty("user.dir");
val path = tempDir + "/warehouse";

SparkSession mySession= SparkSession.builder().appName("ProcessData").master("local[*]").config("spark.sql.warehouse.dir", path).getOrCreate();

另外请检查您是否有对该路径的写入权限。如果您手动创建仓库目录并设置权限应该很好 - 您将确保一切正常

编辑:明白了!首先,您应该检查对 AppData/Local/Temp 的写入权限,因为它是标准的临时目录。

此错误是由OffsetLog引起的。您可以通过添加来更改将在其中创建日志的目录 option("checkpointLocation", ...)

于 2016-09-29T13:18:26.373 回答