2

我正在尝试在 Spark 2.0.0 上查找错误的来源,我有一个将表名作为键和数据框作为值的映射,我循环遍历它,最后使用 spark-avro (3.0.0 -preview2) 将所有内容写入 S3 目录。它在本地运行完美(当然使用本地路径而不是 s3 路径),但是当我在 Amazon 的 EMR 上运行它时,它运行了一段时间,然后它说文件夹已经存在并终止(这意味着相同的键值不止一次在那个 for 循环中使用,对吧?)。这可能是线程的问题吗?

for ((k, v) <- tableMap) {
  val currTable: DataFrame = tableMap(k)
  val decryptedCurrTable = currTable.withColumn("data", decryptUDF(currTable("data")))
  val decryptedCurrTableData = sparkSession.sqlContext.read.json(decryptedCurrTable.select("data").rdd.map(row => row.toString()))
  decryptedCurrTable.write.avro(s"s3://..../$k/table")
  decryptedCurrTableData.write.avro(s"s3://..../$k/tableData")
4

1 回答 1

3

我认为这是一个并发问题,我将代码更改为:

decryptedCurrTable.write.mode("append").avro(s"s3://..../$k/table")
decryptedCurrTableData.write.mode("append").avro(s"s3://..../$k/tableData")  

一切正常。

于 2016-08-03T19:28:43.910 回答