0

我一直在使用 apache_beam sdk for python 来处理我的数据工程。我用的是2.24版本。我在将 apache_beam 版本升级到 2.31 时创建的自定义编码器类存在一些问题。自定义编码器类名称是 IgnoreUnicode。所以,这是我的管道代码:

branchessap_data = (p | 'ReadData branchessap' >> beam.io.ReadFromText(branchessap, skip_header_lines =1, coder=IgnoreUnicode())
                            | 'SplitData branchessap' >> beam.Map(lambda x: x.split('|'))
                            | 'FormatToDict branchessap' >> beam.Map(lambda x: {"branch_id": x[0], "branch_sap": x[1], "branch_name": x[2], "branch_profile": x[3]})
                            | 'ChangeDataType branchessap' >> beam.Map(convert_types_branchessap)
                            | 'DELETE UNWANTED DATA BRANCHESSAP' >> beam.Map(del_unwanted_cols_branchessap)
    )

这是我用来覆盖 apache_beam 的默认编码器的 IgnoreUnicode 类:

# CLASS CHANGE FRENCH CHARACTERS
class IgnoreUnicode(Coder):
    def encode(self, value):
        return value.encode('utf-8','ignore')

    def decode(self, value):
        return value.decode('utf-8','ignore')

    def is_deterministic(self):
        return True

这些代码适用于 apache_beam 2.24 版。但是,如果我将它升级到 2.24 以上的版本,它会给我这样的错误(在这种情况下,我使用的是 2.31 版本):

在此处输入图像描述

是否有任何替代解决方案如何在 2.24 以上版本中实现自定义编码器?

4

1 回答 1

2

看起来这是重组源的方式和在__main__. 我建议两种解决方法之一:

(1) 将 的定义移动IgnoreUnicode到导入的适当模块而不是__main__, 或

(2) 使用 BytesCoder 读取文件,然后使用

`beam.Map(lambda line: line.decode('utf-8','ignore'))`.

(就个人而言,我更喜欢后者,因为最好不要让编码器改变数据。)

于 2021-08-06T22:52:30.847 回答