0

我使用的数据发生了架构更改。对于结果数据帧,它结合了新旧数据,我想转换和过滤的列曾经不存在于旧数据中。它不是由“null”填充的。我想尽可能地对该列进行转换和过滤,即每当列退出时,我想对其进行转换和过滤;对于没有此类列的早期数据,我将保留每一行。

问题是以下代码会导致java.lang.NullPointerException,因为早期的数据没有“ip”列。

val filteredData = sqlContext.sql(
s"SELECT $fieldsString FROM data $filterTerm")
.withColumn("ip",firstIp($"ip"))
.filter("`ip` not in ('30.90.30.90', '70.80.70.80')")
.filter("`ip` not like '10.%'")

上面的“firstIp”函数只是一个从数组中获取第一个IP地址的udf;它由 定义val firstIp = udf[String, String](_.split(",")(0))。我不想按模式将数据分成两部分——那些有“ip”列的和没有的……但是如果不这样分割数据,我的目标可以实现吗?

4

1 回答 1

0

这个答案解决了旧版本的问题,它只要求filter在不同的模式上使用。现在问题已经完全改变了,这真的没有意义

您可以简单地检查该列是否存在:

// let's create some test data
case class SchemaA(host: String)
case class SchemaB(host: String, ip: String) 

val testDataA = sc.parallelize(Seq(
    SchemaA("localhost"),
    SchemaA("other")
)).toDF()
val testDataB = sc.parallelize(Seq(
    SchemaB("localhost", "127.0.0.1"),
    SchemaB("other", "192.168.0.1")
)).toDF()

def doSomething(df: DataFrame) {
  val filtered = if (df.columns.contains("ip")) {
    df.filter("ip  in ('127.0.0.1')")
  } else {
    df
  }
  // do whatever you want after filtering...
  filtered.select($"host").show()
}

doSomething(testDataA)
doSomething(testDataB)
于 2016-08-26T17:52:32.193 回答