1

我有 1000 个传感器,我需要对数据进行分区(即每天每个传感器),然后将每个数据点列表提交给 R 算法)。使用 Spark,简化示例如下所示:

//Spark
val rddData = List(
 ("1:3", List(1,1,456,1,1,2,480,0,1,3,425,0)), 
 ("1:4", List(1,4,437,1,1,5,490,0)),
 ("1:6", List(1,6,500,0,1,7,515,1,1,8,517,0,1,9,522,0,1,10,525,0)),
 ("1:11", List(1,11,610,1))
)

case class DataPoint(
  key:  String,
  value:    List[Int])  // 4 value pattern, sensorID:seq#, seq#, value, state

我转换为镶木地板文件,保存它。在 SparkR 中加载镶木地板,没问题,架构说:

#SparkR
df <- read.df(sqlContext, filespec, "parquet")
schema(df)
StructType
|-name = "key", type = "StringType", nullable = TRUE
|-name = "value", type = "ArrayType(IntegerType,true)", nullable = TRUE

所以在 SparkR 中,我有一个数据框,其中每条记录都有我想要的所有数据(df$value)。我想将该数组提取到 R 可以使用的东西中,然后用一个包含结果数组的新列来改变我的原始数据帧(df)。逻辑上类似于结果 = 函数(df$value)。然后我需要将结果(所有行)返回到 SparkR 数据帧中以进行输出。

如何从 SparkR 数据帧中提取一个数组,然后根据结果进行变异?

4

2 回答 2

0

我也有这个问题。我解决它的方法是在 spark DataFrame 中添加一个行索引,然后explode在 select 语句中使用。确保在选择语句中选择索引,然后选择您想要的行。这会给你一个“长”的数据框。如果 DataFrame 列中的每个嵌套列表都包含相同数量的信息(例如,如果您正在分解 x,y 坐标的列表列),您会期望长 DataFrame 中的每个行索引出现两次。

完成上述操作后,我通常groupBy(index)对分解后的 DataFrame 进行操作,过滤n()每个索引的等于列表中预期项目数的位置,然后在 Spark 上进行额外的 groupBy、merge、join、filter 等操作数据框。

Urban Institute 的 GitHub 页面上有一些优秀的指南。祝你好运。-先天

于 2017-06-02T20:46:10.013 回答
0

设 spark 数据框,dfR 数据框df_r 要将 sparkR df 转换为 R df,使用代码

df_r <- collect(df)

使用 R 数据框df_r,您可以在 R 中进行所有想要执行的计算。假设您在列中有结果df_r$result

Then for converting back to SparkR data frame use code,
#this is a new SparkR data frame, df_1
df_1 <- createDataFrame(sqlContext, df_r) 

For adding the result back to SparkR data frame `df` use code
#this adds the df_1$result to a new column df$result 
#note that number of rows should be same in df and `df_1`, if not use `join` operation
df$result <- df_1$result 

希望这能解决你的问题

于 2016-01-13T06:39:03.887 回答