0

我对R有点陌生,所以请原谅这里的新手...

我正在用 R 编写代码以在脚本中加载 1000 个保存的数据帧(文件),该脚本对每个文件中的数据运行一个函数并将结果值存储在一个向量中。我必须用不同的功能一遍又一遍地做这件事,目前这需要很长时间。

我正在尝试使用多核 mclapply 并行化该过程,但不幸的是,从 2 到 8 个内核的任何东西似乎都比仅在一个内核上运行它需要更长的时间。

由于磁盘 I/O 限制,这个想法从根本上来说是不合理的吗?多核,甚至 R,不是正确的解决方案吗?以 Python 之类的方式打开文件然后在内容上运行 R 函数会是比 R 更好的路径吗?

对此的任何指导或想法将不胜感激 -

为清楚起见添加了代码:

    library(multicore)

    project.path = "/pathtodata/"

    #This function reads the file location and name, then loads it and runs a simple statistic
    running_station_stats <- function(nsrdb_stations)
    {
      varname <- "column_name"
      load(file = paste(project.path, "data/",data_set_list[1], sep = ""))
      tempobj <- as.data.frame(coredata(get(data_set_list[2])))
      mean(tempobj[[varname]],na.rm=TRUE)
    }



    options(cores = 2)

    #This file has a list of R data files data_set_list[1] and the names they were created with data_set_list[2]
    load(file = paste(project.path, "data/data_set_list.RData", sep = ""))

    thelist <- list()

    thelist[[1]] <- data_set_list[1:50,]

    thelist[[2]] <- data_set_list[51:100,]

    thelist[[3]] <- data_set_list[101:150,]

    thelist[[4]] <- data_set_list[151:200,]


    #All three of these are about the same speed to run regardless of the num of cores
    system.time(
    {
      apply(nsrdb_stations[which(nsrdb_stations$org_stations==TRUE),][1:200,],1,running_station_stats)
    })

    system.time(
      lapply(thelist, apply, 1, running_station_stats)
     )

    system.time(
      mclapply(thelist, apply, 1, running_station_stats)
    )
4

2 回答 2

1

Python 和 R 都将尝试使用多个核心来处理数字运算之类的事情。它对读取大量文件没有帮助。多线程也不是答案(re python GIL)。

一些可能的解决方案(都不是简单的)是:

  • 在可以(某些)文件io async的地方使用twisted之类的东西。硬编程,它不是很友好。
  • 使用芹菜或其他一些自制的主从解决方案。很多滚动你自己的行动。
  • 使用 Ipython (w/ ipcluster ) 生成多个进程,python 将为您重新组合 (BEST 解决方案 IMO)
于 2012-06-21T23:45:22.290 回答
0

我会首先在 Python 中尝试良好的老式多处理。上面的选项也是可能的。这是使用多处理模块执行批处理作业的示例。


import multiprocessing as mp
import time

def worker(x): time.sleep(0.2) print "x= %s, x squared = %s" % (x, x*x) return x*x

def apply_async(): pool = mp.Pool() for i in range(100): pool.apply_async(worker, args = (i, )) pool.close() pool.join()

if name == 'main': apply_async()

输出如下所示:


x= 0, x squared = 0
x= 1, x squared = 1
x= 2, x squared = 4
x= 3, x squared = 9
x= 4, x squared = 16
x= 6, x squared = 36
x= 5, x squared = 25
x= 7, x squared = 49
x= 8, x squared = 64
x= 10, x squared = 100
x= 11, x squared = 121
x= 9, x squared = 81
x= 12, x squared = 144

如您所见,这些数字不是按顺序排列的,因为它们是异步执行的。只需更改上面的 worker() 函数来进行处理,并可能使用 mp.Pool(10) 或 mp.Pool(15) 或其他方式更改并发进程的数量。像这样的事情应该是相对困难的。. .

于 2012-06-29T19:25:56.487 回答