2

这个问题是这个线程的后续问题

我想在磁盘框架上执行三个操作

  1. 计算按两列分组的字段的不同值id(key_a 和 key_b)
  2. id计算由两列中的第一列(key_a)分组的字段的不同值
  3. 添加一列,其中包含第一列的不同值/两列的不同值

这是我的代码

      my_df <-
        data.frame(
          key_a = rep(letters, 384),
          key_b = rep(rev(letters), 384),
          id = sample(1:10^6, 9984)
        )
      
      my_df %>% 
        select(key_a, key_b, id) %>% 
        chunk_group_by(key_a, key_b) %>% 
        # stage one
        chunk_summarize(count = n_distinct(id)) %>% 
        collect %>% 
        group_by(key_a, key_b) %>% 
        # stage two
        mutate(count_summed = sum(count)) %>%
        group_by(key_a) %>% 
        mutate(count_all = sum(count)) %>% 
        ungroup() %>% 
        mutate(percent_of_total = count_summed / count_all)

我的数据是磁盘框格式,不是数据框,有100M行8列。

我正在遵循本文档中描述的两步说明

我担心这collect会使我的机器崩溃,因为它将所有东西都带到了内存中

我必须使用collect才能在磁盘框架中使用 dplyr group bys 吗?

4

1 回答 1

2

您应该始终使用srckeep仅将您需要的那些列加载到内存中。

my_df %>% 
        srckeep(c("key_a", "key_b", "id")) %>%
        # select(key_a, key_b, id) %>% # no need if you use srckeep
        chunk_group_by(key_a, key_b) %>% 
        # stage one
        chunk_summarize(count = n_distinct(id)) %>% 
        collect %>% 
        group_by(key_a, key_b) %>% 
        # stage two
        mutate(count_summed = sum(count)) %>%
        group_by(key_a) %>% 
        mutate(count_all = sum(count)) %>% 
        ungroup() %>% 
        mutate(percent_of_total = count_summed / count_all)

collect只会将计算结果chunk_group_by带入chunk_summarizeRAM。它不应该让你的机器崩溃。

您必须collect像 Spark 等其他系统一样使用。

但是,如果您正在计算n_distinct,那无论如何都可以在一个阶段完成

 my_df %>% 
        srckeep(c("key_a", "key_b", "id")) %>%
        #select(key_a, key_b, id) %>% 
        group_by(key_a, key_b) %>% 
        # stage one
        summarize(count = n_distinct(id)) %>% 
        collect

如果你真的关心 RAM 的使用,你可以将 worker 的数量减少到 1

setup_disk.frame(workers=1)
my_df %>% 
        srckeep(c("key_a", "key_b", "id")) %>%
        #select(key_a, key_b, id) %>% 
        group_by(key_a, key_b) %>% 
        # stage one
        summarize(count = n_distinct(id)) %>% 
        collect

setup_disk.frame()
于 2020-09-21T06:24:02.373 回答