1

这个问题类似于 R 中数据非常大的其他问题,但我找不到如何合并/加入然后对两个 dfs 执行计算的示例(而不是读取大量数据帧并使用 mclapply 来执行计算)。这里的问题不是加载数据(大约需要 20 分钟,但它们确实会加载),而是合并和汇总。

我已经尝试了所有我能找到的 data.table 方法、不同类型的连接和 ff,但我仍然遇到 vecseq 限制 2^31 行的问题。现在我正在尝试使用 multidplyr 并行执行,但无法弄清楚错误来自哪里。

数据框:species_data # df 约 6500 万行,cols <- c("id","species_id") 查找 #df 约 1700 万行,cols <- c("id","cell_id","rgn_id")并非查找中的所有 id 都出现在 species_data

## make sample dataframes:

lookup <- data.frame(id = seq(2001,2500, by = 1), 
                     cell_id = seq(1,500, by = 1), 
                     rgn_id = seq(801,1300, by = 1))

library(stringi)

species_id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
                      pattern = "-",
                      stri_rand_strings(1000, length = 5, '[1-9]'))

id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
                    stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
                    stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]"))

species_data <- data.frame(species_id, id)

使用 multidplyr 合并和加入 dfs

library(tidyverse)
install.packages("devtools")
library(devtools)
devtools::install_github("hadley/multidplyr") 
library(multidplyr)
library(parallel)

species_summary <- species_data %>%
  # partition the species data by species id
  partition(species_id, cluster = cluster) %>%
  left_join(species_data, lookup, by = "id") %>%
  dplyr::select(-id) %>%
  group_by(species_id) %>%
  ## total number of cells each species occurs in
  mutate(tot_count_cells = n_distinct(cell_id)) %>%
  ungroup() %>%
  dplyr::select(c(cell_id, species_id, rgn_id, tot_count_cells)) %>%
  group_by(rgn_id, species_id) %>% 
  ## number of cells each species occurs in each region
  summarise(count_cells_eez = n_distinct(cell_id)) %>% 
  collect() %>%
  as_tibble()
## Error in partition(., species_id, cluster = cluster) : unused argument (species_id)

## If I change to:
species_summary <- species_data %>%
  group_by(species_id) %>%
  partition(cluster = cluster) %>% ...
## get, "Error in worker_id(data, cluster) : object 'cluster' not found

这是我第一次尝试并行和大数据,我正在努力诊断错误。

谢谢!

4

1 回答 1

3

首先我加载 dplyr 和 multidplyr

library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(multidplyr)
my_clusters <- new_cluster(3) # I have 4 cores

然后我加载您建议的相同数据

library(stringi)
lookup <- tibble(
  id = as.character(seq(2001, 2500, by = 1)),
  cell_id = seq(1, 500, by = 1),
  rgn_id = sprintf("%s", stri_rand_strings(n = 500, length = 3, pattern = "[0-9]"))
)

species_id <- sprintf(
  "%s%s%s", 
  stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
  pattern = "-",
  stri_rand_strings(n = 1000, length = 5, "[1-9]")
)
id <- sprintf(
  "%s%s%s", 
  stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
  stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
  stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]")
)

species_data <- tibble(species_id, id)

检查结果

species_data
#> # A tibble: 1,000 x 2
#>    species_id id   
#>    <chr>      <chr>
#>  1 CUZ-98293  246  
#>  2 XDG-61673  234  
#>  3 WFZ-94338  230  
#>  4 UIH-97549  226  
#>  5 AGE-35257  229  
#>  6 BMD-75361  249  
#>  7 MJB-78799  226  
#>  8 STS-15141  225  
#>  9 RXD-18645  245  
#> 10 SKZ-58666  243  
#> # ... with 990 more rows
lookup
#> # A tibble: 500 x 3
#>    id    cell_id rgn_id
#>    <chr>   <dbl> <chr> 
#>  1 2001        1 649   
#>  2 2002        2 451   
#>  3 2003        3 532   
#>  4 2004        4 339   
#>  5 2005        5 062   
#>  6 2006        6 329   
#>  7 2007        7 953   
#>  8 2008        8 075   
#>  9 2009        9 008   
#> 10 2010       10 465   
#> # ... with 490 more rows

现在我可以使用 multidplyr 方法运行代码。我根据两个group_by(s)把dplyr代码分两步

first_step <- species_data %>% 
  left_join(lookup, by = "id") %>% 
  select(-id) %>% 
  group_by(species_id) %>% 
  partition(my_clusters) %>% 
  mutate(tot_count_cells = n_distinct(cell_id)) %>% 
  collect() %>% 
  ungroup()
first_step
#> # A tibble: 1,000 x 4
#>    species_id cell_id rgn_id tot_count_cells
#>    <chr>        <dbl> <chr>            <int>
#>  1 UIH-97549       NA <NA>                 1
#>  2 BMD-75361       NA <NA>                 1
#>  3 STS-15141       NA <NA>                 1
#>  4 RXD-18645       NA <NA>                 1
#>  5 HFI-78676       NA <NA>                 1
#>  6 KVP-45194       NA <NA>                 1
#>  7 SGW-29988       NA <NA>                 1
#>  8 WBI-79521       NA <NA>                 1
#>  9 MFY-86277       NA <NA>                 1
#> 10 BHO-37621       NA <NA>                 1
#> # ... with 990 more rows

second_step <- first_step %>% 
    group_by(rgn_id, species_id) %>% 
    partition(my_clusters) %>% 
    summarise(count_cells_eez = n_distinct(cell_id)) %>% 
    collect() %>% 
    ungroup()
second_step
#> # A tibble: 1,000 x 3
#>    rgn_id species_id count_cells_eez
#>    <chr>  <chr>                <int>
#>  1 <NA>   ABB-24645                1
#>  2 <NA>   ABY-98559                1
#>  3 <NA>   AEQ-42462                1
#>  4 <NA>   AFO-58569                1
#>  5 <NA>   AKQ-44439                1
#>  6 <NA>   AMF-23978                1
#>  7 <NA>   ANF-49159                1
#>  8 <NA>   APD-85367                1
#>  9 <NA>   AQH-64126                1
#> 10 <NA>   AST-77513                1
#> # ... with 990 more rows

reprex 包(v0.3.0)于 2020-03-21 创建

于 2020-03-21T16:15:06.307 回答