4

我想从 ParDo 函数中调用一个操作,为(我使用的是 python SDK)beam.io.Write(beam.io.BigQuerySink(..))中的每个键生成一个单独的 BigQuery 表。PCollection这是两个类似的线程,不幸的是没有帮助:

1) https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

2)从数据流管道写入 BQ 时的动态表名

当我执行以下代码时,第一个键的行被插入到 BigQuery 中,然后管道失败并出现以下错误。非常感谢任何关于我做错了什么的建议或任何关于如何解决它的建议。

管道代码:

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query))

class par_upload(beam.DoFn):

    def process(self, context):
        key, value = context.element

        ### This block causes issues ###
        value | 'write_to_bq' >> beam.io.Write(
                        beam.io.BigQuerySink(
                            'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key
                            schema=schema,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                            )
            )
        ### End block ######
        return [value] 


### Following part works fine ###
filtered = (rows    | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
                    | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
                    | 'group_by_key' >> beam.GroupByKey() 
                    | 'par_upload_to_bigquery' >> beam.ParDo(par_upload())
                    | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing
                )

### This part works fine if I comment out the 'write_to_bq' block above
filtered | 'write_to_bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            'PROJECT-NAME:analytics.another_table',
            schema=schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        )

错误信息:

INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table.
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})}
ERROR:root:Error while visiting par_upload_to_bigquery
Traceback (most recent call last):
  File "split_events.py", line 137, in <module>
    run()
  File "split_events.py", line 132, in run
    p.run()
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo
    runner.process(v)
  File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483)
  File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311)
  File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677)
  File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process
    return self.run(self.dofn.process, context, args, kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run
    result = method(context, *args, **kwargs)
  File "split_events.py", line 73, in process
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__
    return _MaterializePValues(cache).visit(result)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit
    return self._pvalue_cache.get_unwindowed_pvalue(node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue
    return [v.value for v in self.get_pvalue(pvalue)]
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue
    value_with_refcount = self._cache[self.key(pvalue)]
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']"

编辑(在第一个答案之后):

我没有意识到我的价值需要是一个PCollection.

我现在已经把我的代码改成了这个(这可能效率很低):

key_pipe = p | 'pipe_' + key >> beam.Create(value)
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..))

现在在本地可以正常工作,但不适用于BlockingDataflowPipelineRunner:-(

管道失败并出现以下错误:

    JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331)
    op.start()
  File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193)
    def start(self):
  File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499)
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5)
4

1 回答 1

0

在类似的线程中,在 ParDo 中执行 BigQuery 写入操作的唯一建议是直接使用 BigQuery API 或使用client

您编写的代码是将Dataflow ParDo 类 beam.io.BigQuerySink()放入 DoFn 函数中。ParDo 类期望与工作代码示例中的PCollection类似filtered。对于在value.

我认为最简单的选择是查看 gcloud-python BigQuery 函数insert_data()并将其放入 ParDo 中。

于 2016-09-24T21:14:51.363 回答