我正在尝试在 Spark 2.0.0 上查找错误的来源,我有一个将表名作为键和数据框作为值的映射,我循环遍历它,最后使用 spark-avro (3.0.0 -preview2) 将所有内容写入 S3 目录。它在本地运行完美(当然使用本地路径而不是 s3 路径),但是当我在 Amazon 的 EMR 上运行它时,它运行了一段时间,然后它说文件夹已经存在并终止(这意味着相同的键值不止一次在那个 for 循环中使用,对吧?)。这可能是线程的问题吗?
for ((k, v) <- tableMap) {
val currTable: DataFrame = tableMap(k)
val decryptedCurrTable = currTable.withColumn("data", decryptUDF(currTable("data")))
val decryptedCurrTableData = sparkSession.sqlContext.read.json(decryptedCurrTable.select("data").rdd.map(row => row.toString()))
decryptedCurrTable.write.avro(s"s3://..../$k/table")
decryptedCurrTableData.write.avro(s"s3://..../$k/tableData")