我使用DataFrame API。
我有现有的 DataFrame 和一个 List 对象(也可以使用 Array)。如何将此列表作为新列添加到现有 DataFrame 中?我应该为此使用Column类吗?
我使用DataFrame API。
我有现有的 DataFrame 和一个 List 对象(也可以使用 Array)。如何将此列表作为新列添加到现有 DataFrame 中?我应该为此使用Column类吗?
您可能应该将您的 List 转换为单个 Column RDD 并由您在 critetia pickeg 上应用 join。简单的 DataFrame 转换:
val df1 = sparkContext.makeRDD(yourList).toDF("newColumn")
如果您需要创建额外的列来执行连接,您可以添加更多列,映射您的列表:
val df1 = sparkContext.makeRDD(yourList).map(i => (i, fun(i)).toDF("newColumn", "joinOnThisColumn")
我不熟悉 Java 版本,但您应该尝试使用JavaSparkContext.parallelize(yourList)
和应用基于此文档的类似映射操作。
这是一个示例,其中我们有一个列日期,并希望添加另一个带有月份的列。
Dataset<Row> newData = data.withColumn("month", month((unix_timestamp(col("date"), "MM/dd/yyyy")).cast("timestamp")));
希望这会有所帮助!
干杯!
对不起,这是我的错,我已经找到了withColumn(String colName, Column col)
应该解决我问题的功能
这个线程有点旧,但我在使用 Java 时遇到了类似的情况。我认为最重要的是,对于我应该如何解决这个问题存在概念上的误解。
为了解决我的问题,我创建了一个简单的 POJO 来协助数据集的新列(而不是尝试在现有列的基础上构建)。我认为从概念上讲,我不明白最好在需要添加附加列的初始读取期间生成数据集。我希望这对将来的某人有所帮助。
考虑以下:
JavaRDD<MyPojo> myRdd = dao.getSession().read().jdbc("jdbcurl","mytable",someObject.getProperties()).javaRDD().map( new Function<Row,MyPojo>() {
private static final long serialVersionUID = 1L;
@Override
public MyPojo call(Row row) throws Exception {
Integer curDos = calculateStuff(row); //manipulate my data
MyPojo pojoInst = new MyPojo();
pojoInst.setBaseValue(row.getAs("BASE_VALUE_COLUMN"));
pojoInst.setKey(row.getAs("KEY_COLUMN"));
pojoInst.setCalculatedValue(curDos);
return pojoInst;
}
});
Dataset<Row> myRddRFF = dao.getSession().createDataFrame(myRdd, MyPojo.class);
//continue load or other operation here...