2

尝试时遇到错误pubsub io > splittable dofn

RuntimeError: Transform node 
AppliedPTransform(ParDo(TestDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, 
_GroupByKeyOnly) was not replaced as expected.

有人可以帮我检查代码中我可能做错的任何事情吗

代码:

"""
python examples/test_restriction_unbounded.py --project mk2 --topic projects/mk2/topics/testing
"""

# pytype: skip-file

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import csv
import logging
import sys
import time
from datetime import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker, OffsetRange
from apache_beam.transforms.core import RestrictionProvider

class TestProvider(RestrictionProvider):
  def initial_restriction(self, element):
    return OffsetRange(0, 1)

  def create_tracker(self, restriction):
    return OffsetRestrictionTracker(restriction)

  def restriction_size(self, element, restriction):
    return restriction.size()


class TestDoFn(beam.DoFn):
    def process(
        self,
        element,
        restriction_tracker=beam.DoFn.RestrictionParam(
            TestProvider())):
        import pdb; pdb.set_trace()
        cur = restriction_tracker.current_restriction().start
        while restriction_tracker.try_claim(cur):
          return element

def run(argv=None, save_main_session=True):
  parser = argparse.ArgumentParser()
  parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
  args, pipeline_args = parser.parse_known_args(argv)

  options = PipelineOptions(pipeline_args)
  options.view_as(StandardOptions).streaming = True

  with beam.Pipeline(options=options) as p:
    # data = ['abc', 'defghijklmno', 'pqrstuv', 'wxyz']
    # actual = (p | beam.Create(data) | beam.ParDo(ExpandingStringsDoFn()))
    scores = p | beam.io.ReadFromPubSub(topic=args.topic) | beam.ParDo(TestDoFn())

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

4

2 回答 2

1

您正在通过蒸汽从 pub/sub 摄取数据。然后你必须在应用这种转换之前按窗口创建批次:(ParDo(TestDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly)

带有窗口的 Pub/Sub 示例:https ://cloud.google.com/pubsub/docs/pubsub-dataflow

尝试这样做:

class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages
"""

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(window.FixedWindows(self.window_size))
        )


def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
    args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    options.view_as(StandardOptions).streaming = True
    window_size = 1.0

    with beam.Pipeline(options=options) as p:

    scores = (p
        | beam.io.ReadFromPubSub(topic=args.topic)
        | "WindowInto" >> GroupWindowsIntoBatches(window_size)
        | beam.ParDo(TestDoFn())
    )
于 2021-03-22T21:42:07.287 回答
-2

我有同样的错误。删除流媒体选项为我解决了这个问题。

于 2020-10-16T19:48:50.147 回答