0

我正在尝试来自谷歌官方网站的以下示例。

import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

此示例在 DataFlowRunner 中工作没有任何问题,因为当我尝试在本地运行器中运行相同的示例时它不起作用。消息是从 pubsub 队列中读取的,但数据不会写入 gcs 文件。我正在使用 apache 梁 2.31.0 版本。

4

1 回答 1

0

我能够重现这种情况,没有问题。我认为您应该注意以下几点:

  • 对于本地运行,您必须使用DirectRunner.
  • 确保您的主题有消息。您必须等待几分钟才能让您的消息填写您的主题,并且您可以在主题上看到它。
  • 检查您的消息的生存时间,我设置为 1 天进行测试。
  • 检查您的订阅消息,请耐心等待。(需要3-8分钟)
  • 检查您在主run命令中使用的路径。我使用以下参数:
mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=project-id \
    --region=us-central1 \
    --inputTopic=projects/project-id/topics/test-topic \
    --output=gs://project-bucket-id/data \
    --gcpTempLocation=gs://project-bucket-id/tmp \
    --runner=DirectRunner \
    --windowSize=2"

有关可用跑步者的完整列表,请查看此链接

于 2022-02-03T15:51:19.277 回答