6

ReadFromText有没有办法使用Python中的转换来读取多行 csv 文件?我有一个包含一行的文件,我试图让 Apache Beam 将输入读取为一行,但无法使其正常工作。

def print_each_line(line):
    print line

path = './input/testfile.csv'
# Here are the contents of testfile.csv
# foo,bar,"blah blah
# more blah blah",baz

p = apache_beam.Pipeline()

(p
 | 'ReadFromFile' >> apache_beam.io.ReadFromText(path)
 | 'PrintEachLine' >> apache_beam.FlatMap(lambda line: print_each_line(line))
 )

# Here is the output:
# foo,bar,"blah blah
# more blah blah",baz

上面的代码将输入解析为两行,即使多行 csv 文件的标准是将多行元素包含在双引号中。

4

3 回答 3

2

Beam 不支持解析 CSV 文件。但是,您可以使用 Python 的 csv.reader。这是一个例子:

import apache_beam
import csv

def print_each_line(line):
  print line

p = apache_beam.Pipeline()

(p 
 | apache_beam.Create(["test.csv"])
 | apache_beam.FlatMap(lambda filename:
     csv.reader(apache_beam.io.filesystems.FileSystems.open(filename)))
 | apache_beam.FlatMap(print_each_line))

p.run()

输出:

['foo', 'bar', 'blah blah\nmore blah blah', 'baz']
于 2018-04-20T23:21:57.690 回答
1

没有一个答案对我有用,但这确实

(
  p
  | beam.Create(['data/test.csv'])
  | beam.FlatMap(lambda filename:
    csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(known_args.input)))
  | "Take only name" >> beam.Map(lambda x: x[0])
  | WriteToText(known_args.output)
)
于 2020-11-08T05:17:34.123 回答
0

ReadFromText将文本文件解析为换行符分隔的元素。因此ReadFromText将两条线视为两个元素。如果您想将文件的内容作为单个元素,您可以执行以下操作:

contents = []
contents.append(open(path).read()) 
p = apache_beam.Pipeline()
p | beam.Create(contents)
于 2018-04-22T05:24:46.323 回答