1

下面的函数获取一个包含 CSV 文件的文件夹(每个文件都是具有日期时间、开盘价、最高价、最低价、收盘价列的金融时间序列)并为每个开盘价、最高价、最低价、收盘价创建一个 XTS 对象,其中每个XTS 列是个人证券。对于我的用例,这种表示允许更方便和更快的处理(与每个文件的单个 XTS 相比)。

require(quantmod)

LoadUniverseToEnv <- function(srcDir, env) {
  fileList <- list.files(srcDir)
  if (length(fileList) == 0)
    stop("No files found!")

  env$op <- NULL
  env$hi <- NULL
  env$lo <- NULL
  env$cl <- NULL
  cols <- NULL

  for (file in fileList) {
    filePath <- sprintf("%s/%s", srcDir, file)
    if (file.info(filePath)$isdir == FALSE) {
      x <- as.xts(read.zoo(filePath, header=TRUE, sep=",", tz=""))
      cols <- c(sub("_.*", "", file), cols)
      # do outer join
      env$op <- merge(Op(x), env$op)
      env$hi <- merge(Hi(x), env$hi)
      env$lo <- merge(Lo(x), env$lo)
      env$cl <- merge(Cl(x), env$cl)
      cat(sprintf("%s : added: %s from: %s to: %s\n", as.character(Sys.time()), file, start(x), end(x)))
    }
  }
  colnames(env$op) <- cols
  colnames(env$hi) <- cols
  colnames(env$lo) <- cols
  colnames(env$cl) <- cols
}

对于有限数量的文件,性能很好,但随着 XTS 对象的宽度线性变慢,因此成为大型数据集的问题。合并期间的瓶颈是 CPU,当一个新列被附加到四个对象中的每一个时(例如,100 毫秒最初减慢 1 毫秒/列)

由于它受 CPU 限制,我的第一个想法是通过合并 n 批文件然后合并结果来并行化,但我想知道是否有更好的方法。

4

1 回答 1

0

我为此找到的最佳解决方案是合并“块”。例如,假设有 100 列,合并成 10 个 XTS 对象,每个对象有 10 列,然后合并这 10 个对象可以显着提高性能。

下面的示例显示了将 2000 个 xts 对象与每个 1000 行和相同索引合并时的 1500% 改进。

例子:

require(xts)
require(foreach)

nCols <- 2000
nRows <- 1000

x <- xts(runif(nRows), order.by=as.Date(seq(1:nRows)))
xList <- list()
for (i in 1:nCols)
  xList[[i]] <- x

testA <- function() {
  merged <- NULL
  for (x in xList)  
    merged <- merge(x, merged)
  colnames(merged) <- 1:length(xList)
  merged
}

testB <- function() {
  nChunks <- floor(sqrt(length(xList)))
  idx <- split(1:n, sort(1:n %% nChunks))

  merged <- foreach (chunk = 1:nChunks, .combine = "merge") %do% {
    merged <- foreach (i = idx[[chunk]], .combine = "merge") %do% {
      xList[[i]]
    }
    merged
  }
  colnames(merged) <- 1:length(xList)
  merged
}

print("Test A")
print(system.time(resultA <- testA()))
print("Test B")
print(system.time(resultB <- testB()))
print(sprintf("Identical : %s", identical(resultA, resultB)))
print(sprintf("Dimensions: %dx%d", ncol(resultA), nrow(resultA)))

输出:

[1] "Test A"
   user  system elapsed
  33.12    3.18   36.30
[1] "Test B"
   user  system elapsed
   2.28    0.01    2.31
[1] "Identical : TRUE"
[1] "Dimensions: 2000x1000"

请注意,foreach它不是并行运行的。

于 2013-06-20T10:40:51.857 回答