1

我有一个下面的架构

root
 |-- DataPartition: long (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- _action: string (nullable = true)
 |-- env:Data: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- al:FundamentalAnalytic: struct (nullable = true)
 |    |    |-- _analyticItemInstanceKey: long (nullable = true)
 |    |    |-- _financialPeriodEndDate: string (nullable = true)
 |    |    |-- _financialPeriodType: string (nullable = true)
 |    |    |-- _isYearToDate: boolean (nullable = true)
 |    |    |-- _lineItemId: long (nullable = true)
 |    |    |-- al:AnalyticConceptCode: string (nullable = true)
 |    |    |-- al:AnalyticConceptId: long (nullable = true)
 |    |    |-- al:AnalyticIsEstimated: boolean (nullable = true)
 |    |    |-- al:AnalyticValue: struct (nullable = true)
 |    |    |    |-- _VALUE: double (nullable = true)
 |    |    |    |-- _currencyId: long (nullable = true)
 |    |    |-- al:AuditID: string (nullable = true)
 |    |    |-- al:FinancialPeriodTypeId: long (nullable = true)
 |    |    |-- al:FundamentalSeriesId: struct (nullable = true)
 |    |    |    |-- _VALUE: long (nullable = true)
 |    |    |    |-- _objectType: string (nullable = true)
 |    |    |    |-- _objectTypeId: long (nullable = true)
 |    |    |-- al:InstrumentId: long (nullable = true)
 |    |    |-- al:IsAnnual: boolean (nullable = true)
 |    |    |-- al:TaxonomyId: long (nullable = true)

现在这是一个经常变化的 xml 文件。我只想处理包含env:Data.sr:Source.*的税, 为此我在下面编写了代码

val dfType = dfContentItem.
    select(getDataPartition($"DataPartition").
        as("DataPartition"), 
        $"TimeStamp".as("TimeStamp"), 
        $"env:Data.sr:Source.*", 
        getFFActionParent($"_action")
        .as("FFAction|!|")
    ).filter($"env:Data.sr:Source._organizationId".isNotNull)
dfType.show(false)

但这仅sr:Source在架构中找到时才有效,否则我会遇到异常

线程“main”中的异常org.apache.spark.sql.AnalysisException:没有这样的结构字段 sr:Source in _type, cr:TRFCoraxData, fun:Fundamental, md:Identifier, md:Relationship;

忽略我有空检查sr:Source,但这对我不起作用。对于该检查,我也遇到了同样的错误。

基本上我需要的是 env:Data.sr:Source.* 为 null 然后我想退出处理并且下一个标签处理将重新开始。

4

1 回答 1

0

通常在org.apache.spark.sql.AnalysisException查询中有问题时抛出 - 所以我很确定这是因为你试图null在这些情况下过滤 a

错误处理scala通常是通过Option一篇很好的文章来 完成的

def handleNulls(organizationId: String): Option[Boolean] = {
     val orgId = Option(organizationId).getOrElse(return None)
     Some()
}
val betterNullsUdf = udf[Option[Boolean], Integer](handleNulls)

val dfType = dfContentItem.
    select(getDataPartition($"DataPartition").
        as("DataPartition"), 
        $"TimeStamp".as("TimeStamp"), 
        betterNullsUdf($"env:Data.sr:Source.*"), 
        getFFActionParent($"_action")
        .as("FFAction|!|")
    ).filter($"env:Data.sr:Source._organizationId".isNotNull)
dfType.show(false)
于 2018-04-10T04:12:05.007 回答