0

我正在尝试使用以下Scala代码在脚本编辑中以CVS格式过滤到S3后编写数据帧。

当前状态:

  • 运行后不显示任何错误,只是不写入 S3。

  • 日志屏幕打印开始,但是看不到打印结束。

  • 没有指示问题的特定错误消息。

  • 在 temp.count 处停止。

环境条件:我拥有所有 S3 的管理员权限。

import com.amazonaws.services.glue.GlueContext
import <others>

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val datasource0 = glueContext.getCatalogSource(database = "db", tableName = "table", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    val appymapping1 = datasource0.appyMapping(mapping=........)

    val temp=appymapping1.toDF.filter(some filtering rules)
    print("start")
    if (temp.count() <= 0) {
    temp.write.format("csv").option("sep", ",").save("s3://directory/error.csv")
  }
    print("End")
     
4

1 回答 1

1

您正在使用 if 条件将 Dataframe 写入 S3(If 条件是检查数据帧是否有一行或多行),但您的 If 条件是反转的。仅当数据框具有 0(或更少)行时才适用。所以改变它。

高级:Spark 总是将文件保存为“part-”名称。所以将 S3 路径更改为s3://directory/。并添加.mode("overwrite")

所以你写的df查询应该是

temp.write.format("csv").option("sep", ",").mode("overwrite").save("s3://directory")

于 2020-10-18T17:17:53.070 回答