5

我使用DataFrame API

我有现有的 DataFrame 和一个 List 对象(也可以使用 Array)。如何将此列表作为新列添加到现有 DataFrame 中?我应该为此使用Column类吗?

4

4 回答 4

5

您可能应该将您的 List 转换为单个 Column RDD 并由您在 critetia pickeg 上应用 join。简单的 DataFrame 转换:

 val df1 = sparkContext.makeRDD(yourList).toDF("newColumn")

如果您需要创建额外的列来执行连接,您可以添加更多列,映射您的列表:

val df1 = sparkContext.makeRDD(yourList).map(i => (i, fun(i)).toDF("newColumn", "joinOnThisColumn")

我不熟悉 Java 版本,但您应该尝试使用JavaSparkContext.parallelize(yourList)和应用基于此文档的类似映射操作。

于 2015-08-21T09:02:06.927 回答
2

这是一个示例,其中我们有一个列日期,并希望添加另一个带有月份的列。

Dataset<Row> newData = data.withColumn("month", month((unix_timestamp(col("date"), "MM/dd/yyyy")).cast("timestamp")));

希望这会有所帮助!

干杯!

于 2017-05-23T15:57:14.903 回答
2

对不起,这是我的错,我已经找到了withColumn(String colName, Column col)应该解决我问题的功能

于 2015-08-21T09:01:23.200 回答
1

这个线程有点旧,但我在使用 Java 时遇到了类似的情况。我认为最重要的是,对于我应该如何解决这个问题存在概念上的误解。

为了解决我的问题,我创建了一个简单的 POJO 来协助数据集的新列(而不是尝试在现有列的基础上构建)。我认为从概念上讲,我不明白最好在需要添加附加列的初始读取期间生成数据集。我希望这对将来的某人有所帮助。

考虑以下:

        JavaRDD<MyPojo> myRdd = dao.getSession().read().jdbc("jdbcurl","mytable",someObject.getProperties()).javaRDD().map( new Function<Row,MyPojo>() {

                       private static final long serialVersionUID = 1L;

                       @Override
                       public MyPojo call(Row row) throws Exception {
                       Integer curDos = calculateStuff(row);   //manipulate my data

                       MyPojo pojoInst = new MyPojo();

                       pojoInst.setBaseValue(row.getAs("BASE_VALUE_COLUMN"));
                       pojoInst.setKey(row.getAs("KEY_COLUMN"));
                       pojoInst.setCalculatedValue(curDos);

                       return pojoInst;
                      }
                    });

         Dataset<Row> myRddRFF = dao.getSession().createDataFrame(myRdd, MyPojo.class);

//continue load or other operation here... 
于 2017-01-27T22:10:34.503 回答