3

我已经使用 Google Cloud Dataflow SDK 编写了一个流式传输管道,但我想在本地测试我的管道。我的管道从 Google Pub/Sub 获取输入数据。

是否可以使用 DirectPipelineRunner(本地执行,不在 Google Cloud 中)运行访问 Pub/Sub (pubsubIO) 的作业?

以我的普通用户帐户登录时遇到权限问题。我是项目的所有者,我尝试访问的 pub/sub 主题。

4

4 回答 4

3

InProcessPipelineRunnerDataflow SDK for Java 1.6.0中引入的 DirectPipelineRunner的新版本,包括对无界 PCollection 的支持。

(注意:在 Apache Beam 中,此功能已添加到 DirectRunner,但在 Dataflow SDK for Java 中,我们要到 2.0 才能做到这一点,因为它更好地检查模型可能会导致额外的测试失败,我们认为这是向后不兼容的更改。因此暂时添加了配套的 InProcessPipelineRunner。)

还有一些很棒的新支持来测试迟到和乱序的数据。

于 2016-10-27T16:58:43.397 回答
2

DirectPipelineRunner 目前不支持 PubsubIO。在本地使用时,您将收到一条错误消息,指出“没有为 PubsubIO.Read 注册评估器”。

您的权限问题很可能来自其他来源。

于 2015-04-08T00:39:32.343 回答
0

只是为了帮助任何会搜索这个的人,

使用最新版本,您可以执行此操作。如果要在本地运行管道,请使用“DirectRunner”在本地运行。使用“DataflowRunner”在云中运行它。

如下图所示设置暂存位置和跑步者。

streamingOption.setStagingLocation(PipelineConstants.PUBSUB_STAGING_LOCATION);

streamingOption.setRunner(DataflowRunner.class);

或将其作为参数传递。

您能否详细说明您所面临的许可问题?

于 2017-11-13T11:49:59.067 回答
-1

这实际上是可能的,但 DirectPipelineRunner 不支持无界数据源。因此,您必须设置 amaxReadTimemaxNumRecords类似:

PubsubIO.Read.topic("projects/<project-id>/topics/<topic>").maxNumRecords(1000);

来自PubSub 文档

一个 PTransform,它不断地从 Cloud Pub/Sub 流中读取数据并返回包含流中项目的字符串的 PCollection。当使用仅支持有界 PCollection(例如 DirectPipelineRunner)的 PipelineRunner 运行时,只能处理输入 Pub/Sub 流的有界部分。因此,必须设置 PubsubIO.Read.Bound.maxNumRecords(int) 或 PubsubIO.Read.Bound.maxReadTime(Duration)。

于 2016-10-12T21:33:03.610 回答