我编写了代码来将 CSV 文件中的数据注入 Google 的 BigQuery。我使用 apache 梁作为管道。
这是管道代码:
list_of_data = open_file()
DistrictAllocationAndListStore_data = (p
| 'CreateDictData from DistrictAllocationAndListStore File' >> beam.Create(list_of_data)
| 'RenameDictKey DistrictAllocationAndListStore' >> beam.Map(rename_key)
| 'ChangeDataType DistrictAllocationAndListStore' >> beam.Map(convert_types_DistrictAllocationAndListStore)
| 'Write DistrictAllocationAndListStore' >> WriteToText('output/data-branchessap', '.txt')
)
# Write to BQ
DistrictAllocationAndListStore_data | 'Write to BQ DistrictAllocationAndListStore' >> beam.io.WriteToBigQuery(
table=table_id_tender,
dataset=dataset_id,
project=project_id,
schema=schema_tenders_master,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# batch_size=int(100)
)
这是 convert_types_DistrictAllocationAndListStore 方法:
def convert_types_DistrictAllocationAndListStore(data):
"""Converts string values to their appropriate type."""
date_format = '%Y-%m-%d'
data['site_code'] = str(data['site_code']) if 'site_code' in data else None
data['store_name'] = str(data['store_name']) if 'store_name' in data else None
data['city'] = str(data['city']) if 'city' in data else None
data['type'] = str(data['type']) if 'type' in data else None
data['region_no'] = str(data['region_no']) if 'region_no' in data else None
if data.get("opening_date") != "":
date = datetime.datetime.strptime(data.get("opening_date"), date_format)
data['opening_date'] = str(date.date())
data['opening_date_year'] = str(date.year)
data['opening_date_month'] = str(date.month)
data['opening_date_day'] = str(date.day)
data['opening_date_dayname'] = str(date.strftime("%A"))
data['opening_date_weeks'] = str(date.strftime("%W"))
else:
data['opening_date'] = None
data['opening_date_year'] = ""
data['opening_date_month'] = ""
data['opening_date_day'] = ""
data['opening_date_dayname'] = ""
data['opening_date_weeks'] = ""
return data
但是,当我注释掉 Write To BQ 代码并写入本地(使用本地运行器)时,代码成功运行而没有错误。但是当我尝试将其写入 BQ(使用 DataFlow 运行程序运行)时,出现错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 993, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 351, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/media/arroganthooman/DATA/Fikri/UI/Magang/Script/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1570, in <lambda>
wrapper = lambda x: [fn(x)]
File "sb-fin-district_allocation_list_store.py", line 104, in convert_types_DistrictAllocationAndListStore
NameError: name 'datetime' is not defined [while running 'ChangeDataType DistrictAllocationAndListStore-ptransform-570']
似乎没有导入日期时间,但我已在代码顶部导入它。有什么解决办法吗?