所以我试图在找到它时添加列,但我不想在 xml 架构中不存在列时添加。这就是我正在做的事情,我想我在检查条件时做错了什么。
val temp = tempNew1
.withColumn("BookMark", when($"AsReportedItem.fs:BookMark".isNotNull or $"AsReportedItem.fs:BookMark" =!= "", 0))
.withColumn("DocByteOffset", when($"AsReportedItem.fs:DocByteOffset".isNotNull or $"AsReportedItem.fs:DocByteOffset" =!= "", 0))
.withColumn("DocByteLength", when($"AsReportedItem.fs:DocByteLength".isNotNull or $"AsReportedItem.fs:DocByteLength" =!= "", 0))
.withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription".isNotNull or $"AsReportedItem.fs:EditedDescription" =!= "", 0))
.withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription._VALUE".isNotNull or $"AsReportedItem.fs:EditedDescription._VALUE" =!= "", 0))
.withColumn("EditedDescription_languageId", when($"AsReportedItem.fs:EditedDescription._languageId".isNotNull or $"AsReportedItem.fs:EditedDescription._languageId" =!= "", 0))
.withColumn("ReportedDescription", when($"AsReportedItem.fs:ReportedDescription._VALUE".isNotNull or $"AsReportedItem.fs:ReportedDescription._VALUE" =!= "", 0))
.withColumn("ReportedDescription_languageId", when($"AsReportedItem.fs:ReportedDescription._languageId".isNotNull or $"AsReportedItem.fs:ReportedDescription._languageId" =!= "", 0))
.withColumn("FinancialAsReportedLineItemName_languageId", when($"FinancialAsReportedLineItemName._languageId".isNotNull or $"FinancialAsReportedLineItemName._languageId" =!= "", 0))
.withColumn("FinancialAsReportedLineItemName", when($"FinancialAsReportedLineItemName._VALUE".isNotNull or $"FinancialAsReportedLineItemName._VALUE" =!= "", 0))
.withColumn("PeriodPermId_objectTypeId", when($"PeriodPermId._objectTypeId".isNotNull or $"PeriodPermId._objectTypeId" =!= "", 0))
.withColumn("PeriodPermId", when($"PeriodPermId._VALUE".isNotNull or $"PeriodPermId._VALUE" =!= "", 0))
.drop($"AsReportedItem").drop($"AsReportedItem")
但是当我找到 column 它对我来说很好,但是当 column 不存在时tempNew1
我得到错误。
基本上,如果在 schema 中找不到标记,我根本不想使用 withColumn 。
安慰我在这里失踪。请帮我确定问题。
我得到的错误如下
线程“主”org.apache.spark.sql.AnalysisException 中的异常:无法解析“
AsReportedItem.fs:BookMark
”给定输入列:[IsAsReportedCurrencySetManually,
这也是我尝试过的
def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
val temp = tempNew1.withColumn("BookMark", when(hasColumn(tempNew1,"AsReportedItem.fs:BookMark") == true, $"AsReportedItem.fs:BookMark"))
但无法使其充分发挥作用..
这是有效的,但我怎样才能为所有列编写它。
val temp = if (hasColumn(tempNew1, "AsReportedItem")) {
tempNew1
.withColumn("BookMark", $"AsReportedItem.fs:BookMark")
.withColumn("DocByteOffset", $"AsReportedItem.fs:DocByteOffset")
.withColumn("DocByteLength", $"AsReportedItem.fs:DocByteLength")
.withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription")
.withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription._VALUE")
.withColumn("EditedDescription_languageId", $"AsReportedItem.fs:EditedDescription._languageId")
.withColumn("ReportedDescription", $"AsReportedItem.fs:ReportedDescription._VALUE")
.withColumn("ReportedDescription_languageId", $"AsReportedItem.fs:ReportedDescription._languageId")
.withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
.withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
.withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
.withColumn("PeriodPermId", $"PeriodPermId._VALUE")
.drop($"AsReportedItem")
} else {
tempNew1
.withColumn("BookMark", lit(null))
.withColumn("DocByteOffset", lit(null))
.withColumn("DocByteLength", lit(null))
.withColumn("EditedDescription", lit(null))
.withColumn("EditedDescription", lit(null))
.withColumn("EditedDescription_languageId", lit(null))
.withColumn("ReportedDescription", lit(null))
.withColumn("ReportedDescription_languageId", lit(null))
.withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
.withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
.withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
.withColumn("PeriodPermId", $"PeriodPermId._VALUE")
.drop($"AsReportedItem")
}
添加主数据框的架构
root
|-- DataPartition: string (nullable = true)
|-- TimeStamp: string (nullable = true)
|-- PeriodId: long (nullable = true)
|-- SourceId: long (nullable = true)
|-- FinancialStatementLineItem_lineItemId: long (nullable = true)
|-- FinancialStatementLineItem_lineItemInstanceKey: long (nullable = true)
|-- StatementCurrencyId: long (nullable = true)
|-- StatementTypeCode: string (nullable = true)
|-- uniqueFundamentalSet: long (nullable = true)
|-- AuditID: string (nullable = true)
|-- EstimateMethodCode: string (nullable = true)
|-- EstimateMethodId: long (nullable = true)
|-- FinancialAsReportedLineItemName: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- FinancialStatementLineItemSequence: long (nullable = true)
|-- FinancialStatementLineItemValue: double (nullable = true)
|-- FiscalYear: long (nullable = true)
|-- IsAnnual: boolean (nullable = true)
|-- IsAsReportedCurrencySetManually: boolean (nullable = true)
|-- IsCombinedItem: boolean (nullable = true)
|-- IsDerived: boolean (nullable = true)
|-- IsExcludedFromStandardization: boolean (nullable = true)
|-- IsFinal: boolean (nullable = true)
|-- IsTotal: boolean (nullable = true)
|-- PeriodEndDate: string (nullable = true)
|-- PeriodPermId: struct (nullable = true)
| |-- _VALUE: long (nullable = true)
| |-- _objectTypeId: long (nullable = true)
|-- ReportedCurrencyId: long (nullable = true)
|-- StatementSectionCode: string (nullable = true)
|-- StatementSectionId: long (nullable = true)
|-- StatementSectionIsCredit: boolean (nullable = true)
|-- SystemDerivedTypeCode: string (nullable = true)
|-- SystemDerivedTypeCodeId: long (nullable = true)
|-- Unit: double (nullable = true)
|-- UnitEnumerationId: long (nullable = true)
|-- FFAction|!|: string (nullable = true)
|-- PartitionYear: long (nullable = true)
|-- PartitionStatement: string (nullable = true)
在列出现在架构中之后添加架构
|-- uniqueFundamentalSet: long (nullable = true)
|-- AsReportedItem: struct (nullable = true)
| |-- fs:BookMark: string (nullable = true)
| |-- fs:DocByteLength: long (nullable = true)
| |-- fs:DocByteOffset: long (nullable = true)
| |-- fs:EditedDescription: struct (nullable = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _languageId: long (nullable = true)
| |-- fs:ItemDisplayedNegativeFlag: boolean (nullable = true)
| |-- fs:ItemDisplayedValue: double (nullable = true)
| |-- fs:ItemScalingFactor: long (nullable = true)
| |-- fs:ReportedDescription: struct (nullable = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _languageId: long (nullable = true)
| |-- fs:ReportedValue: double (nullable = true)
|-- EstimateMethodCode: string (nullable = true)
|-- EstimateMethodId: long (nullable = true)
|-- FinancialAsReportedLineItemName: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- FinancialLineItemSource: long (nullable = true)