3

我有几个这样的 CSV 文件:

site,run,id,payload,dir
1,1,1,528,1
1,1,1,540,2
1,1,3,532,1
# ... thousands more rows ...

(在我使用的实际情况下,共有三个文件,总共 1,408,378 行。)对于绘图,我想将它们重新调整为这种格式:

label,stream,dir,i,payload
A,1,1,1,586
A,1,1,2,586
A,1,1,3,586
# etc

其中“标签”源自 CSV 文件的名称;“stream”是一个序列号,分配给一个文件中“site”、“run”和“id”的每个组合(因此,仅在“label”中是唯一的);'i' 是每个 'stream' 中的行号;'dir' 和 'payload' 直接取自原始文件。我还想丢弃每个流的前 20 行以外的所有行。我事先知道 CSV 文件中的每个单元格(标题除外)都是一个正整数,并且“dir”只取值 1 和 2。

我用 终止了我最初的尝试plyr,因为经过一个多小时的计算,它已经运行了高达 6GB 工作集的 R 进程,而且看不到尽头。foreach最新的对并行性的闪亮新支持plyr并没有帮助:八个进程每个运行 10 分钟的 CPU 时间,然后它又回到一个进程,又持续了一个小时,是的,再次炸毁了我的 RAM。

于是我用 Python 给自己写了一个帮助脚本,用它我更流利:

import sys
def processOne(fname):
    clusters = {}
    nextCluster = 1
    with open(fname + ".csv", "r") as f:
        for line in f:
            line = line.strip()
            if line == "site,run,id,payload,dir": continue
            (site, run, id, payload, dir) = line.split(',')
            clind = ",".join((site,run,id))

            clust = clusters.setdefault(clind,
                                        { "i":nextCluster, "1":0, "2":0 })
            if clust["i"] == nextCluster:
                nextCluster += 1

            clust[dir] += 1
            if clust[dir] > 20: continue

            sys.stdout.write("{label},{i},{dir},{j},{payload}\n"
                             .format(label=fname,
                                     i=clust["i"],
                                     dir=dir,
                                     j=clust[dir],
                                     payload=payload))

sys.stdout.write("label,stream,dir,i,payload\n")
for fn in sys.argv[1:]: processOne(fn)

并从 R 脚本中调用它:

all <- read.csv(pipe("python preprocess.py A B C", open="r"))

五秒钟内完成。

所以问题是:在 R 中执行此操作的正确方法是什么?不是这个特定的任务,而是这类问题。在分析数据之前,我几乎总是需要对一堆数据进行洗牌,而且在其他语言中它几乎总是更容易——无论是我编写代码还是计算机执行它。这让我觉得我只是将 R 用作一个接口,如果我改为ggplot2学习的话,从长远来看,也许我会节省自己的时间。matplotlib

4

2 回答 2

8

完成所需步骤的 R 代码:

--"其中'label'来自CSV文件的名称;"

filvec <- list.files(<path>)
for (fil in filvec) {  #all the statements will be in the loop body
  dat <- read.csv(fil)
  dat$label <- fil   # recycling will make all the elements the same character value

--" 'stream' 是分配给一个文件中 'site'、'run' 和 'id' 的每个组合的序列号(因此,仅在 'label' 中是唯一的);"

 dat$stream <- as.numeric( with(dat, interaction(site, run, id) ) )

--" 'i' 是每个 'stream' 中的行号;"

dat$i <- ave(dat$site,     # could be any column since we are not using its values
             dat$stream,   # 'ave' passes grouped vectors, returns same length vector
             FUN= function(x) 1:length(x) )

--" 和 'dir' 和 'payload' 直接取自原始文件。"

 # you can refer to them by name or column number

--“我还想丢弃每个流的前 20 行以外的所有行。”

 out <- dat[dat$i <= 20,     # logical test for the "first 20"
             c('label','stream','dir','i','payload') ]  # chooses columns desired
     }  # end of loop

实际上目前这将覆盖三个“dat”文件。(因此主要用于进行速度检查的一次性测试运行。)您可以进行最后一次调用,例如:

  assign(paste(fil, "out", sep="_"), dat[dat$i <= 20,
                                          c('label','stream','dir','i','payload') ] )
于 2012-05-05T19:37:29.593 回答
6

data.table软件包通常会加快大型到大型 data.frames 上的操作。

例如,下面的代码将三个 500,000 行的 data.frames 作为输入,并在我不太强大的笔记本电脑上在大约 2 秒内执行您描述的所有转换。

library(data.table)

## Create a list of three 500000 row data.frames
df <- expand.grid(site=1:2, run=1:2, id=1:2)
df <- data.frame(df, payload=1:1000, dir=rep(1, 5e5))
dfList <- list(df, df, df)
dfNames <- c("firstCSV", "secondCSV", "thirdCSV")

## Manipulate the data with data.table, and time the calculations
system.time({
outputList <-
    lapply(1:3, FUN = function(ii) {
        label <- dfNames[ii]
        df <- dfList[[ii]]
        dt <- data.table(df, key=c("site", "run", "id"))
        groups <- unique(dt[,key(dt), with=FALSE])
        groups[, stream := seq_len(nrow(groups))]
        dt <- dt[groups]
        # Note: The following line only keeps the first 3 (rather than 20) rows
        dt <- dt[, head(cbind(.SD, i=seq_len(.N)), 3), by=stream]
        dt <- cbind(label, dt[,c("stream", "dir", "i", "payload")])
        df <- as.data.frame(dt)
        return(df)
    })
output <- do.call(rbind, outputList)
})
##    user  system elapsed 
##    1.25    0.18    1.44 

## Have a look at the output
rbind(head(output,4), tail(output,4))

编辑:在 2012 年 5 月 8 日,我通过替换这一行将上述运行时间缩短了约 25%:

dt <- dt[, head(cbind(.SD, i=seq_len(.N)), 3), by=stream]

对于这两个:

dt <- cbind(dt, i = dt[, list(i=seq_len(.N)), by=stream][[2]])
dt <- dt[i<=3,]  # Note: This only keeps the 1st 3 (rather than 20) rows
于 2012-05-05T22:23:47.013 回答