1

我有这 2 个 Spark 表:

simx
x0: num 1.00 2.00 3.00 ...
x1: num 2.00 3.00 4.00 ...
...
x788: num 2.00 3.00 4.00 ...

simy
y0: num 1.00 2.00 3.00 ...

在这两个表中,每列具有相同数量的值。表xy都分别保存到句柄simX_tblsimY_tbl中。实际数据量很大,可能达到40GB。

我想计算其中每一列的相关系数simxsimy比如说像cor(x0, y0, 'pearson'))。

我到处搜索,我认为没有任何现成的cor功能,所以我正在考虑使用相关公式本身(就像这里提到的那样)。

基于我上一个问题中的一个很好的解释,我认为使用mutate_allormutate_each不是很有效,并且给出了C stack error更大的数据大小,所以我考虑改用直接invoke调用函数Spark

到目前为止,我设法到达这里:

exprs <- as.list(paste0("sum(", colnames(simX_tbl),")"))

corr_result <- simX_tbl%>%  
  spark_dataframe() %>% 
  invoke("selectExpr", exprs) %>% 
  invoke("toDF", as.list(colnames(simX_tbl))) %>% 
  sdf_register("corr_result")

计算. sum_ simx但是后来,我意识到我还需要计算simy表格,而且我不知道如何将两个表格交互在一起(例如,simy在操作时访问simx)。

有什么方法可以更好地计算相关性?或者也许只是如何与其他 Spark 表交互。

我的 Spark 版本是 1.6.0

编辑:我尝试使用以下combine功能dplyr

xy_df <- simX_tbl %>% 
  as.data.frame %>%
  combine(as.data.frame(simY_tbl)) %>%
  # convert both table to dataframe, then combine. 
  # It will become list, so need to convert to dataframe again
  as.data.frame 

xydata <- copy_to(sc, xy_df, "xydata") #copy the dataframe into Spark table

但我不确定这是否是一个好的解决方案,因为:

  1. 需要加载到 R 内部的数据框中,我认为这对于大数据不实用
  2. 尝试head处理xydata时,列名变为所有值的连接

    xydata %>% head
    Source:   query [6 x 790]
    Database: spark connection master=yarn-client app=sparklyr local=FALSE
    

    c_1_67027262134984_2_44919662134984_1_85728542134984_1_49317262134984_
    1 1.670273
    2 2.449197
    3 1.857285
    4 1.493173
    5
    -5671.5768557.6

4

1 回答 1

3

就我个人而言,我会通过返回输入数据集来解决它。只是为了记录输入数据已使用 CSV 阅读器加载:

df <- spark_read_csv(
  sc, path = path, name = "simData", delimiter = " ", 
  header = "false", infer_schema = "false"
) %>% rename(y = `_c0`, xs = `_c1`)

看起来或多或少像这样:

      y                                                   xs
  <chr>                                                <chr>
1 21.66     2.643227,1.2698358,2.6338573,1.8812188,3.8708665
2 35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823
3 15.22  2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842

现在,让我们一起处理这两个部分,而不是将数据拆分为多个表:

exprs <- lapply(
 0:(n - 1), 
 function(i) paste("CAST(xs[", i, "] AS double) AS x", i, sep=""))

df %>% 
  # Convert to native Spark
  spark_dataframe() %>%
  # Split and select xs, but retain y
  invoke("selectExpr", list("y", "split(xs, ',') AS  xs")) %>%
  invoke("selectExpr", c("CAST(y AS DOUBLE)", exprs)) %>%
  # Register table so we can access it from dplyr
  invoke("registerTempTable", "exploded_df")

并申请summarize_each

tbl(sc, "exploded_df") %>% summarize_each(funs(corr(., y)), starts_with("x"))
Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

         x0         x1        x2         x3         x4
      <dbl>      <dbl>     <dbl>      <dbl>      <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

快速健全性检查(yx0y和之间的相关性x4):

cor(c(21.66, 35.15, 15.22), c(2.643227, 3.422151, 2.8302398))
[1] 0.8503358
cor(c(21.66, 35.15, 15.22), c(3.8708665, 4.0771823, 4.6600842))
[1] -0.5571591

您当然可以先将数据居中:

exploded <- tbl(sc, "exploded_df")

avgs <- summarize_all(exploded, funs(mean)) %>% as.data.frame()
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))

transmute_(exploded, .dots = setNames(center_exprs, colnames(exploded))) %>% 
  summarize_each(funs(corr(., y)), starts_with("x"))

不影响结果

Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

         x0         x1        x2         x3         x4
      <dbl>      <dbl>     <dbl>      <dbl>      <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

如果transmute_和都summarize_each导致一些问题,我们可以将居中和相关性直接推送到 Spark 中:

#Centering
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))

exploded %>%  
  spark_dataframe() %>% 
  invoke("selectExpr", center_exprs) %>% 
  invoke("toDF", as.list(colnames(exploded))) %>%
  invoke("registerTempTable", "centered")

centered <- tbl(sc, "centered")

#Correlation
corr_exprs <- lapply(
  0:(n - 1), 
  function(i) paste("corr(y, x", i, ") AS x", i, sep=""))

centered %>% 
  spark_dataframe() %>% 
  invoke("selectExpr", corr_exprs) %>% 
  invoke("registerTempTable", "corrs")

 tbl(sc, "corrs")
Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

         x0         x1        x2         x3         x4
      <dbl>      <dbl>     <dbl>      <dbl>      <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

中间表当然不是必需的,这可以在我们从数组中提取数据的同时应用。

于 2017-04-26T15:24:59.687 回答