0

我正在使用targets工作流管道。此管道的一部分是监视 csv 文件目录的更新。这个目录有10000多个csv文件,每周都会增加新文件。我希望能够识别新添加的文件并将它们附加到现有的一组*.rds文件中。最简单的方法是重新运行*.rds每周创建 5 个文件子集的过程,但这需要时间。有效的方法是识别新添加的文件,并且只需bind_rows使用正确的rds文件。

dir()我可以通过使用and的典型编程很容易地做到这一点setdiff(),我在其中存储前一天的 csv 文件路径的快照。但我正在努力在targets框架内实现这一目标。

这是一个似乎不起作用的尝试。我想我想监视/_targets目录中的临时结果,但我不确定如何去做。而且,targets文档建议不要tar_load在目标配置本身内部使用。

tar_script({
   list(
      tar_target(csv_directory, "/csv/"),
      tar_target(csv_snapshot, dir(csv_directory)),
      tar_target(append_action, if(length(setdiff(dir(csv_directory), dir(csv_snapshot))) > 0){
                                ...}
})

4

1 回答 1

2

一些可能有帮助的组件:

  1. 文件目标:https ://books.ropensci.org/targets/files.html 。使用tar_target(format = "file"),包监视输入和/或输出文件的更改并重新运行受影响的目标(如果有)。
  2. 替代存储格式:https ://docs.ropensci.org/targets/reference/tar_target.html#storage-formats 。与其将 CSV 文件聚合到外部 RDS 文件中,不如使用诸如自动压缩输出数据并确保您不必担心对文件进行微观管理的方法更tar_target(format = "feather")有效targets
  3. 动态分支:books.ropensci.org/targets/dynamic.html。动态分支是一种在管道运行时定义大量新目标的方法。例如,这使您可以为一个文件或一批现有文件创建一个新目标。
  4. 批处理:https ://books.ropensci.org/targets/dynamic.html#batching 。10000 个目标是很多,而targets这么多目标包可能会减慢速度,因为每个目标都有开销成本。

因此,我建议您将 CSV 文件组织成批次(例如,按周)并动态地分批处理它们。根据您的用例的具体情况,另一种批处理结构可能更合适。

csv/
├── week1/
│   ├── data1.csv
│   ├── data2.csv
│   ├── ...
├── week2/
│   ├── data1.csv
│   ├── data2.csv
│   ├── ...
...

管道示意图:

# _targets.R
process_csv_dir <- function(csv_dir) {...} # custom user-defined function
list(
  tar_target(csv_dir, list.files("csv", full.names = TRUE)),
  tar_target(
    processed_data,
    process_csv_dir(csv_dir),
    pattern = map(csv_dir), # dynamic branching
    format = "feather" # from the arrow package
  )
)
于 2021-09-03T22:39:05.913 回答