我使用以下代码来重新塑造数据框,使用它的行进行此重塑。
数据框包含产品更改其 ID 的日期,但为了将其与包含交易的大量其他数据框连接起来,我需要一个定义有效 ID 范围的新列。
例如,如果产品 A 在 01/01 生效日更改为产品 B,然后更改为产品 C 在 03/01 生效,我将需要在同一行中的开始日期和结束日期,以便我可以将它与巨大的按产品有效为 B(或 C)的日期过滤交易数据框,因此我可以正确地将产品重命名为其有效的真实 ID。
另一条信息,df_MPC 大约 800 行,不会增长太多。
所以我正在尝试的方法(在开发环境中运行时有效)是 foldleft。
MPC 数据框的总结版本为:
Product | Date | NewProd
A | 01/01/2018| B
B | 03/01/2018| C
客观的:
Product | Date | NewProd | OriginalProd | EndDate
A | 01/01/2018| B | A | 03/01
B | 03/01/2018| C | A | 31/12-9999
(OriginalProd 列是与事务数据框的最终连接所必需的)
导致stackoverflow的代码如下:
var rowList = new ListBuffer[Row]()
val it = df_MPC_SOURCE.toLocalIterator()
while (it.hasNext) { rowList += it.next()}
val df_MPC_TRANSFORMED = rowList.reverse
.foldLeft(df_MPC_pre_edit_source: DataFrame)((acc, elem) => acc
.withColumn("EndDate",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("N_PRODUCT_ID_NEW") === elem.getAs("N_PRODUCT_ID")),
elem.getAs("D_EFFECTIVE_CHANGE"))
.otherwise(col("EndDate")))
.withColumn("OriginalProd",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("MPC_original") === elem.getAs("N_PRODUCT_ID_NEW")),
elem.getAs("N_PRODUCT_ID"))
.otherwise(col("OriginalProd")))
)
此代码将源数据帧(上面提供的示例)转换为目标数据帧(上面的示例)。
它通过以排序方式(按日期)迭代其所有 800 行并针对其每一行来做到这一点:
- 更改与给定行匹配的所有产品的有效日期
- 如果我们找到中间产品,请更新原始产品 ID。例如,如果我们有一个产品从 ID“A”交换到“B”,然后从“B”交换到“C”,我们将需要具有原始产品 ID 的列(在本例中为“A”)因为能够将我们的结果与仅包含产品 ID“A”的原始交易表连接起来。
并且在集群中使用这段代码时抛出的错误:
Exception in thread "main" java.lang.StackOverflowError
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
at scala.collection.AbstractSet.apply(Set.scala:47)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:334)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
如何使此代码在集群中以与在本地正常工作相同的方式工作?谢谢!