1

我编写了代码来将 CSV 文件中的数据注入 Google 的 BigQuery。我使用 apache 梁作为管道。

这是管道代码:

list_of_data = open_file()
DistrictAllocationAndListStore_data = (p 
                                | 'CreateDictData from DistrictAllocationAndListStore File' >> beam.Create(list_of_data)
                                | 'RenameDictKey DistrictAllocationAndListStore' >> beam.Map(rename_key)
                                | 'ChangeDataType DistrictAllocationAndListStore' >> beam.Map(convert_types_DistrictAllocationAndListStore)
                                | 'Write DistrictAllocationAndListStore' >> WriteToText('output/data-branchessap', '.txt')
                                )


# Write to BQ
DistrictAllocationAndListStore_data | 'Write to BQ DistrictAllocationAndListStore' >> beam.io.WriteToBigQuery(
                table=table_id_tender,
                dataset=dataset_id,
                project=project_id,
                schema=schema_tenders_master,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                # batch_size=int(100)
                )

这是 convert_types_DistrictAllocationAndListStore 方法:

def convert_types_DistrictAllocationAndListStore(data):
    """Converts string values to their appropriate type."""

    date_format = '%Y-%m-%d'

    data['site_code'] = str(data['site_code']) if 'site_code' in data else None
    data['store_name'] = str(data['store_name']) if 'store_name' in data else None
    data['city'] = str(data['city']) if 'city' in data else None
    data['type'] = str(data['type']) if 'type' in data else None
    data['region_no'] = str(data['region_no']) if 'region_no' in data else None


    if data.get("opening_date") != "":
        date = datetime.datetime.strptime(data.get("opening_date"), date_format)
        data['opening_date'] = str(date.date())
        data['opening_date_year'] = str(date.year)
        data['opening_date_month'] = str(date.month)
        data['opening_date_day'] = str(date.day)
        data['opening_date_dayname'] = str(date.strftime("%A"))
        data['opening_date_weeks'] = str(date.strftime("%W"))
    else:
        data['opening_date'] = None
        data['opening_date_year'] = ""
        data['opening_date_month'] = ""
        data['opening_date_day'] = ""
        data['opening_date_dayname'] = ""
        data['opening_date_weeks'] = ""

    return data

但是,当我注释掉 Write To BQ 代码并写入本地(使用本地运行器)时,代码成功运行而没有错误。但是当我尝试将其写入 BQ(使用 DataFlow 运行程序运行)时,出现错误:

    Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 993, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 351, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/media/arroganthooman/DATA/Fikri/UI/Magang/Script/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1570, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "sb-fin-district_allocation_list_store.py", line 104, in convert_types_DistrictAllocationAndListStore
NameError: name 'datetime' is not defined [while running 'ChangeDataType DistrictAllocationAndListStore-ptransform-570']

似乎没有导入日期时间,但我已在代码顶部导入它。有什么解决办法吗?

4

3 回答 3

1

您可以尝试将 import datetime 放入函数中。

于 2021-08-03T03:48:11.217 回答
0

import dateime在你的剧本的顶部吗?

于 2021-08-03T06:56:09.250 回答
0

使用管道选项--save_main_session=True解决了这个错误。无需在函数内导入。

参考资料:https ://cloud.google.com/dataflow/docs/resources/faq#programming_with_the_cloud_dataflow_sdk_for_python

于 2021-10-01T10:21:03.960 回答