
 |-- label: string (nullable = true)
 |-- features: struct (nullable = true)
 |    |-- feat1: string (nullable = true)
 |    |-- feat2: string (nullable = true)
 |    |-- feat3: string (nullable = true)


  val data = rawData
     .filter( !(rawData("features.feat1") <=> "100") )


  val data = rawData





10 回答 10



import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try

case class DFWithDropFrom(df: DataFrame) {
  def getSourceField(source: String): Try[StructField] = {
    Try(df.schema.fields.filter(_.name == source).head)

  def getType(sourceField: StructField): Try[StructType] = {

  def genOutputCol(names: Array[String], source: String): Column = {
    f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)

  def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
      .map(genOutputCol(_, source))
      .map(df.withColumn(source, _))


scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features

scala> case class record(label: String, features: features)
defined class record

scala> val df = sc.parallelize(Seq(record("a_label",  features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]

scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
|  label|features|
|a_label| [f2,f3]|

scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
|  label|  features|

scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
|  label|  features|


于 2015-09-25T23:44:48.597 回答


import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}

  * Various Spark utilities and extensions of DataFrame
object DataFrameUtils {

  private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
    } else {
      colType match {
        case colType: StructType =>
          if (dropColName.startsWith(s"${fullColName}.")) {
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                : _*))
          } else {
        case other => Some(col)

  protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
      .flatMap(f => {
        if (colName.startsWith(s"${f.name}.")) {
          dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
            case Some(x) => Some((f.name, x))
            case None => None
        } else {
      .foldLeft(df.drop(colName)) {
        case (df, (colName, column)) => df.withColumn(colName, column)

    * Extended version of DataFrame that allows to operate on nested fields
  implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
      * Drops nested field from DataFrame
      * @param colName Dot-separated nested field name
    def dropNestedColumn(colName: String): DataFrame = {
      DataFrameUtils.dropColumn(df, colName)


import DataFrameUtils._
于 2016-10-09T12:49:00.980 回答

扩展 spektom 答案。支持数组类型:

object DataFrameUtils {

  private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
              .flatMap(f =>
                dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                  case Some(x) => Some(x.alias(f.name))
                  case None => None
              : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                : _*))

        case other => Some(col)
    } else {

  protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
      .flatMap(f => {
        if (colName.startsWith(s"${f.name}.")) {
          dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
            case Some(x) => Some((f.name, x))
            case None => None
        } else {
      .foldLeft(df.drop(colName)) {
        case (df, (colName, column)) => df.withColumn(colName, column)

    * Extended version of DataFrame that allows to operate on nested fields
  implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
      * Drops nested field from DataFrame
      * @param colName Dot-separated nested field name
    def dropNestedColumn(colName: String): DataFrame = {
      DataFrameUtils.dropColumn(df, colName)

于 2017-09-06T22:03:54.927 回答

我将在这里扩展 mmendez.semantic 的答案,并解释子线程中描述的问题。

  def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
              // we are potentially dropping a column from within a struct, that is itself inside an array
              // Spark has some very strange behavior in this case, which they insist is not a bug
              // see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
              // and also the thread here: https://stackoverflow.com/a/39943812/375670
              // this is a workaround for that behavior

              // first, get all struct fields
              val innerFields = innerType.fields
              // next, create a new type for all the struct fields EXCEPT the column that is to be dropped
              // we will need this later
              val preserveNamesStruct = ArrayType(StructType(
                innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
              // next, apply dropSubColumn recursively to build up the new values after dropping the column
              val filteredInnerFields = innerFields.flatMap(f =>
                dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
              // finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
              // struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
              // to get the original names back

        case _ => Some(col)
    } else {

  def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields.flatMap(f => {
      if (colName.startsWith(s"${f.name}.")) {
        dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
          case Some(x) => Some((f.name, x))
          case None => None
      } else {
    }).foldLeft(df.drop(colName)) {
      case (df, (colName, column)) => df.withColumn(colName, column)


// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// now you can paste the function definitions

// create a deeply nested and complex JSON structure    
val jsonData = """{
      "foo": "bar",
      "top": {
        "child1": 5,
        "child2": [
            "child2First": "one",
            "child2Second": 2,
            "child2Third": -19.51
        "child3": ["foo", "bar", "baz"],
        "child4": [
            "child2First": "two",
            "child2Second": 3,
            "child2Third": 16.78

// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())

// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")

 |-- foo: string (nullable = true)
 |-- top: struct (nullable = false)
 |    |-- child1: long (nullable = true)
 |    |-- child2: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child2Second: long (nullable = true)
 |    |    |    |-- child2Third: double (nullable = true)
 |    |-- child3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- child4: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child2First: string (nullable = true)
 |    |    |    |-- child2Second: long (nullable = true)
 |    |    |    |-- child2Third: double (nullable = true)

|foo|top                                                   |
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
于 2020-05-29T20:40:22.480 回答

在 spektom 的 scala 代码片段之后,我在 Java 中创建了一个类似的代码。由于 java 8 没有 foldLeft,我使用了 forEachOrdered。此代码适用于 spark 2.x(我使用的是 2.1)另外我注意到删除一列并使用具有相同名称的 withColumn 添加它不起作用,所以我只是替换列,它似乎工作。


public class DataFrameUtils {

public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) {
    final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame);
        .flatMap( f -> {
           if (columnName.startsWith(f.name() + ".")) {
               final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName);
               if (column.isPresent()) {
                   return Stream.of(new Tuple2<>(f.name(), column));
               } else {
                   return Stream.empty();
           } else {
               return Stream.empty();
        }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple));

    return dataFrameFolder.getDF();

private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
    Optional<Column> column = Optional.empty();
    if (!fullColumnName.equals(dropColumnName)) {
        if (colType instanceof StructType) {
            if (dropColumnName.startsWith(fullColumnName + ".")) {
                column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName)));
        } else {
            column = Optional.of(col);

    return column;

private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
    return Arrays.stream(colType.fields())
        .flatMap(f -> {
                    final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
                            fullColumnName + "." + f.name(), dropColumnName);
                    if (column.isPresent()) {
                        return Stream.of(column.get().alias(f.name()));
                    } else {
                        return Stream.empty();


private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> {
    private Dataset<Row> df;

    public DataFrameFolder(Dataset<Row> df) {
        this.df = df;

    public Dataset<Row> getDF() {
        return df;

    public void accept(Tuple2<String, Optional<Column>> colTuple) {
        if (!colTuple._2().isPresent()) {
            df = df.drop(colTuple._1());
        } else {
            df = df.withColumn(colTuple._1(), colTuple._2().get());


private class Pojo {
    private String str;
    private Integer number;
    private List<String> strList;
    private Pojo2 pojo2;

    public String getStr() {
        return str;

    public Integer getNumber() {
        return number;

    public List<String> getStrList() {
        return strList;

    public Pojo2 getPojo2() {
        return pojo2;


private class Pojo2 {
    private String str;
    private Integer number;
    private List<String> strList;

    public String getStr() {
        return str;

    public Integer getNumber() {
        return number;

    public List<String> getStrList() {
        return strList;


SQLContext context = new SQLContext(new SparkContext("local[1]", "test"));
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class);
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");


 |-- number: integer (nullable = true)
 |-- pojo2: struct (nullable = true)
 |    |-- number: integer (nullable = true)
 |    |-- str: string (nullable = true)
 |    |-- strList: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- str: string (nullable = true)
 |-- strList: array (nullable = true)
 |    |-- element: string (containsNull = true)


 |-- number: integer (nullable = true)
 |-- pojo2: struct (nullable = false)
 |    |-- number: integer (nullable = true)
 |    |-- strList: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- str: string (nullable = true)
 |-- strList: array (nullable = true)
 |    |-- element: string (containsNull = true)
于 2017-01-18T15:46:47.243 回答


from pyspark.sql.functions import col, arrays_zip

        .withColumn("features", arrays_zip("features.feat2", "features.feat3"))
        .withColumn("features", col("features").cast(schema))


from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
      StructField('feat2', StringType(), True), 
      StructField('feat3', StringType(), True), 
于 2020-01-22T09:58:53.400 回答

PySpark 实现

import pyspark.sql.functions as sf

def _drop_nested_field(
    schema: StructType,
    field_to_drop: str,
    parents: List[str] = None,
) -> Column:
    parents = list() if parents is None else parents
    src_col = lambda field_names: sf.col('.'.join(f'`{c}`' for c in field_names))

    if '.' in field_to_drop:
        root, subfield = field_to_drop.split('.', maxsplit=1)
        field_to_drop_from = next(f for f in schema.fields if f.name == root)

        return sf.struct(
            *[src_col(parents + [f.name]) for f in schema.fields if f.name != root],
                parents=parents + [root]

        # select all columns except the one to drop
        return sf.struct(
            *[src_col(parents + [f.name])for f in schema.fields if f.name != field_to_drop],

def drop_nested_field(
    df: DataFrame,
    field_to_drop: str,
) -> DataFrame:
    if '.' in field_to_drop:
        root, subfield = field_to_drop.split('.', maxsplit=1)
        field_to_drop_from = next(f for f in df.schema.fields if f.name == root)

        return df.withColumn(root, _drop_nested_field(
        return df.drop(field_to_drop)

df = drop_nested_field(df, 'a.b.c.d')
于 2020-03-14T04:11:21.907 回答

为此添加 java 版本解决方案。

实用程序类(传递您的数据集和必须删除的嵌套列到 dropNestedColumn 函数)。

(Lior Chaga 的答案中存在一些错误,我在尝试使用他的答案时已对其进行了纠正)。

public class NestedColumnActions {
dataset : dataset in which we want to drop columns
columnName : nested column that needs to be deleted
public static Dataset<?> dropNestedColumn(Dataset<?> dataset, String columnName) {

    //Special case of top level column deletion
        return dataset.drop(columnName);

    final DataSetModifier dataFrameFolder = new DataSetModifier(dataset);
            .flatMap(f -> {
                //If the column name to be deleted starts with current top level column
                if (columnName.startsWith(f.name() + DOT)) {
                    //Get new column structure under f , expected after deleting the required column
                    final Optional<Column> column = dropSubColumn(functions.col(f.name()), f.dataType(), f.name(), columnName);
                    if (column.isPresent()) {
                        return Stream.of(new Tuple2<>(f.name(), column));
                    } else {
                        return Stream.empty();
                } else {
                    return Stream.empty();
            //Call accept function with Tuples of (top level column name, new column structure under it)
            .forEach(colTuple -> dataFrameFolder.accept(colTuple));

    return dataFrameFolder.getDataset();

private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
    Optional<Column> column = Optional.empty();
    if (!fullColumnName.equals(dropColumnName)) {
        if (colType instanceof StructType) {
            if (dropColumnName.startsWith(fullColumnName + DOT)) {
                column = Optional.of(functions.struct(getColumns(col, (StructType) colType, fullColumnName, dropColumnName)));
            else {
                column = Optional.of(col);
        } else {
            column = Optional.of(col);

    return column;

private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
    return Arrays.stream(colType.fields())
            .flatMap(f -> {
                        final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
                                fullColumnName + "." + f.name(), dropColumnName);
                        if (column.isPresent()) {
                            return Stream.of(column.get().alias(f.name()));
                        } else {
                            return Stream.empty();


private static class DataSetModifier implements Consumer<Tuple2<String, Optional<Column>>> {
    private Dataset<?> df;

    public DataSetModifier(Dataset<?> df) {
        this.df = df;

    public Dataset<?> getDataset() {
        return df;

    colTuple[0]:top level column name
    colTuple[1]:new column structure under it
    public void accept(Tuple2<String, Optional<Column>> colTuple) {
        if (!colTuple._2().isPresent()) {
            df = df.drop(colTuple._1());
        } else {
            df = df.withColumn(colTuple._1(), colTuple._2().get());


于 2020-03-17T03:37:27.637 回答

Make Structs Easy * 库使得在嵌套数据结构中执行诸如添加、删除和重命名字段等操作变得容易。该库在 Scala 和 Python 中都可用。


import org.apache.spark.sql.functions._

case class Features(feat1: String, feat2: String, feat3: String)
case class Record(features: Features, arrayOfFeatures: Seq[Features])

val df = Seq(
   Record(Features("hello", "world", "!"), Seq(Features("red", "orange", "yellow"), Features("green", "blue", "indigo")))


// root
//  |-- features: struct (nullable = true)
//  |    |-- feat1: string (nullable = true)
//  |    |-- feat2: string (nullable = true)
//  |    |-- feat3: string (nullable = true)
//  |-- arrayOfFeatures: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- feat1: string (nullable = true)
//  |    |    |-- feat2: string (nullable = true)
//  |    |    |-- feat3: string (nullable = true)


// +-----------------+----------------------------------------------+
// |features         |arrayOfFeatures                               |
// +-----------------+----------------------------------------------+
// |[hello, world, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +-----------------+----------------------------------------------+


import com.github.fqaiser94.mse.methods._

// drop feat2 from features
df.withColumn("features", $"features".dropFields("feat2")).show(false)

// +----------+----------------------------------------------+
// |features  |arrayOfFeatures                               |
// +----------+----------------------------------------------+
// |[hello, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +----------+----------------------------------------------+

我注意到有很多关于其他解决方案的后续评论,询问是否有办法删除嵌套在嵌套在数组内部的结构中的列。这可以通过将Make Structs Easy库提供的函数与spark-hofs库提供的函数相结合来完成,如下所示:

import za.co.absa.spark.hofs._

// drop feat2 in each element of arrayOfFeatures
df.withColumn("arrayOfFeatures", transform($"arrayOfFeatures", features => features.dropFields("feat2"))).show(false)

// +-----------------+--------------------------------+
// |features         |arrayOfFeatures                 |
// +-----------------+--------------------------------+
// |[hello, world, !]|[[red, yellow], [green, indigo]]|
// +-----------------+--------------------------------+

*完全披露:我是本答案中引用的Make Structs Easy库的作者。

于 2020-03-20T00:07:07.537 回答

对于 Spark 3.1+,您可以dropFields在结构类型列上使用方法:

按名称删除 StructType 中的字段的表达式。如果架构不包含字段名称,则这是一个空操作。

val df = sql("SELECT named_struct('feat1', 1, 'feat2', 2, 'feat3', 3) features")

val df1 = df.withColumn("features", $"features".dropFields("feat1"))
于 2022-01-26T20:16:49.480 回答