1

我是 bonobo-etl 的新手,我正在尝试编写一个一次加载多个文件的作业,但我无法让 CsvReader 使用@use_context_processor注释。我的代码片段:

def input_file(self, context):
    yield 'test1.csv'
    yield 'test2.csv'
    yield 'test3.csv'

@use_context_processor(input_file)
def extract(f):
    return bonobo.CsvReader(path=f,delimiter='|')

def load(*args):
    print(*args)

def get_graph(**options):
    graph = bonobo.Graph()
    graph.add_chain(extract,load)
    return graph

当我运行这项工作时,我得到<bonobo.nodes.io.csv.CsvReader object at 0x7f849678dc88>的不是 CSV 的行。

如果我像这样对阅读器进行硬编码graph.add_chain(bonobo.CsvReader(path='test1.csv',delimiter='|'),load),它就可以工作。

任何帮助,将不胜感激。

谢谢你。

4

1 回答 1

1

由于 bonobo.CsvReader 不支持(尚)从输入流中读取文件名,因此您需要使用自定义阅读器。

这是一个适用于我的一组 csvs 的解决方案:

import bonobo
import bonobo.config
import bonobo.util
import glob
import csv

@bonobo.config.use_context
def read_multi_csv(context, name):
    with open(name) as f:
        reader = csv.reader(f, delimiter=';')
        headers = next(reader)
        if not context.output_type:
            context.set_output_fields(headers)
        for row in reader:
            yield tuple(row)

def get_graph(**options):
    graph = bonobo.Graph()

    graph.add_chain(
      glob.glob('prenoms_*.csv'),
      read_multi_csv,
      bonobo.PrettyPrinter(),
    )

    return graph


if __name__ == '__main__':
    with bonobo.parse_args() as options:
        bonobo.run(get_graph(**options))

对此片段的评论很少,按阅读顺序:

  • use_context装饰器将节点执行上下文注入到转换调用中,允许.set_output_fields(...)使用第一个 csv 标头。
  • 其他 csv 标头被忽略,在我的情况下它们都是一样的。对于您自己的案例,您可能需要稍微复杂一点的逻辑。
  • 然后,我们只需在一个bonobo.Graph实例中生成文件名glob.glob(在我的例子中,流将包含:prenoms_2004.csv prenoms_2005.csv ... prenoms_2011.csv prenoms_2012.csv)并将其传递给我们的自定义阅读器,它将被调用一次对于每个文件,打开它,并输出它的行。

希望有帮助!

于 2018-08-07T06:59:55.037 回答