I recently coded a CSV file source for Apache Beam, and I've added it to the beam_utils
PiPy package. Specifically, you can use it as follows:
- Install beam utils:
pip install beam_utils
- Import:
from beam_utils.sources import CsvFileSource
.
- Use it as a source:
beam.io.Read(CsvFileSource(input_file))
.
In its default behavior, the CsvFileSource
returns dictionaries indexed by header - but you can take a look at the documentation to decide what option you'd like to use.
As an extra, if you want to implement your own custom CsvFileSource
, you need to subclass Beam's FileBasedSource
:
import csv
class CsvFileSource(beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.reader(self._file)
for i, rec in enumerate(reader):
yield res
And you can expand this logic to parse for headers and other special behavior.
Also, as a note, this source can not be split because it needs to be sequentially parsed, so it may represent a bottleneck when processing data (though that may be okay).