36

我正在手动创建一个数据框进行一些测试。创建它的代码是:

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))

所以架构看起来像这样:

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

我想为这些变量中的每一个设置“nullable = true”。如何从一开始就声明它或在创建后将其切换到新的数据框中?

4

7 回答 7

49

回答

随着进口

import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

您可以使用

/**
 * Set nullable property of column.
 * @param df source DataFrame
 * @param cn is the column name to change
 * @param nullable is the flag to set, such that the column is  either nullable or not
 */
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {

  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
    case y: StructField => y
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

直接地。

您还可以通过“pimp my library”库模式使该方法可用(请参阅我的 SO 帖子What is the best way to define custom methods on a DataFrame?),这样您就可以调用

val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )

编辑

替代解决方案 1

使用稍微修改过的版本setNullableStateOfColumn

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

替代解决方案 2

明确定义架构。(使用反射创建更通用的解决方案)

configuredUnitTest("Stackoverflow.") { sparkContext =>

  case class Input(id:Long, var1:Int, var2:Int, var3:Double)

  val sqlContext = new SQLContext(sparkContext)
  import sqlContext.implicits._


  // use this to set the schema explicitly or
  // use refelection on the case class member to construct the schema
  val schema = StructType( Seq (
    StructField( "id", LongType, true),
    StructField( "var1", IntegerType, true),
    StructField( "var2", IntegerType, true),
    StructField( "var3", DoubleType, true)
  ))

  val is: List[Input] = List(
    Input(1110, 0, 1001,-10.00),
    Input(1111, 1, 1001, 10.00),
    Input(1111, 0, 1002, 10.00)
  )

  val rdd: RDD[Input] =  sparkContext.parallelize( is )
  val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
  val inputDF = sqlContext.createDataFrame( rowRDD, schema ) 

  inputDF.printSchema
  inputDF.show()
}
于 2015-10-18T07:43:44.857 回答
34

另一种选择,如果您需要就地更改数据框,并且无法重新创建,您可以执行以下操作:

.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))

然后 Spark 将认为该列可能包含null,并且可空性将设置为true。此外,您可以使用udf, 将您的值包装在Option. 即使对于流媒体案例也能正常工作。

于 2017-09-08T15:10:19.857 回答
17

这是一个迟到的答案,但想为来到这里的人提供一个替代解决方案。您可以DataFrame Column通过对代码进行以下修改,从一开始就自动将其设为可空:

case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
  .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
    input(Some(1111),Some(1),1001,10.00),
    input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema

这将产生:

root
 |-- id: long (nullable = true)
 |-- var1: integer (nullable = true)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]

本质上,如果您Option通过使用Some([element])None作为实际输入将字段声明为,则该字段可以为空。否则,该字段将不能为空。我希望这有帮助!

于 2016-06-22T17:12:23.087 回答
10

设置所有列可空参数的更紧凑版本

而不是case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)一个可以使用_.copy(nullable = nullable). 那么整个函数可以写成:

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}
于 2016-07-13T13:37:10.650 回答
3

只需在您的案例类中使用 java.lang.Integer 而不是 scala.Int 。

case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
于 2016-06-23T09:17:34.730 回答
2

谢谢马丁·森内。只是一点点补充。在内部结构类型的情况下,您可能需要递归设置可空值,如下所示:

def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
    def set(st: StructType): StructType = {
      StructType(st.map {
        case StructField(name, dataType, _, metadata) =>
          val newDataType = dataType match {
            case t: StructType => set(t)
            case _ => dataType
          }
          StructField(name, newDataType, nullable = nullable, metadata)
      })
    }

    df.sqlContext.createDataFrame(df.rdd, set(df.schema))
  }
于 2019-05-04T13:17:32.903 回答
-1

当您想要删除一列并在 spark 数据框中创建一个新列时,您可以创建一个可为空的列,例如。

  1. df.withColumn("Employee_Name", when(lit('') == '', '').otherwise(lit(None)))

注意:如果您想创建字符串类型的列并使其可为空,则上述代码有效

  1. df.withColumn("Employee_Name", when(lit('') == '', 0).otherwise(lit(None)))

注意:如果您想创建一个整数类型的列并使其可以为空,则上述代码有效

于 2022-02-21T09:44:56.613 回答