0

我正在尝试从我的 python 管道中的默认命名空间读取多个数据存储类型,并希望对其进行处理。我编写的函数在本地使用 DirectRunner 运行良好,但是当我使用 DataflowRunner 在云上运行管道时,其中一种(包含 1500 条记录)读取速度非常快,而另一种(包含数百万条记录)读取速度非常慢.

作为参考,当我试图在管道中读取一种(包含数百万条记录)时,它花了 10 分钟,但是当它们一起执行时,它花了将近 1 个小时,但它仍然只处理了 1/10 的记录。

我无法弄清楚问题所在。

这是我的代码

def read_from_datastore(project,user_options, pipeline_options):
  p = beam.Pipeline(options=pipeline_options)
  query = query_pb2.Query()
  query.kind.add().name = user_options.kind   #reading 1st kind this is the one with million records

  students = p | 'ReadFromDatastore' >> ReadFromDatastore(project=project,query=query)

  query = query_pb2.Query()
  query.kind.add().name = user_options.kind2   #reading 2nd kind this is the one with 1500 records

  courses = p | 'ReadFromDatastore2' >> ReadFromDatastore(project=project,query=query)

  open_courses = courses | 'closed' >> beam.FlatMap(filter_closed_courses)
  enrolled_students = students | beam.ParDo(ProfileDataDumpDataFlow(),AsIter(open_courses))

让我知道是否有人知道为什么会发生这种情况。

4

1 回答 1

0

我看到你正在做两种连接操作。为此目的,如果您将实体导出到存储桶然后将其加载到BigQuery会更合适、更快。在 BigQuery 中进行所需的联接操作。

它不是读取实体在您的工作中花费时间,而是连接操作。

于 2018-08-23T15:45:47.503 回答