我已经使用 Google Cloud Dataflow SDK 编写了一个流式传输管道,但我想在本地测试我的管道。我的管道从 Google Pub/Sub 获取输入数据。
是否可以使用 DirectPipelineRunner(本地执行,不在 Google Cloud 中)运行访问 Pub/Sub (pubsubIO) 的作业?
以我的普通用户帐户登录时遇到权限问题。我是项目的所有者,我尝试访问的 pub/sub 主题。
我已经使用 Google Cloud Dataflow SDK 编写了一个流式传输管道,但我想在本地测试我的管道。我的管道从 Google Pub/Sub 获取输入数据。
是否可以使用 DirectPipelineRunner(本地执行,不在 Google Cloud 中)运行访问 Pub/Sub (pubsubIO) 的作业?
以我的普通用户帐户登录时遇到权限问题。我是项目的所有者,我尝试访问的 pub/sub 主题。
InProcessPipelineRunner是Dataflow SDK for Java 1.6.0中引入的 DirectPipelineRunner的新版本,包括对无界 PCollection 的支持。
(注意:在 Apache Beam 中,此功能已添加到 DirectRunner,但在 Dataflow SDK for Java 中,我们要到 2.0 才能做到这一点,因为它更好地检查模型可能会导致额外的测试失败,我们认为这是向后不兼容的更改。因此暂时添加了配套的 InProcessPipelineRunner。)
还有一些很棒的新支持来测试迟到和乱序的数据。
DirectPipelineRunner 目前不支持 PubsubIO。在本地使用时,您将收到一条错误消息,指出“没有为 PubsubIO.Read 注册评估器”。
您的权限问题很可能来自其他来源。
只是为了帮助任何会搜索这个的人,
使用最新版本,您可以执行此操作。如果要在本地运行管道,请使用“DirectRunner”在本地运行。使用“DataflowRunner”在云中运行它。
如下图所示设置暂存位置和跑步者。
streamingOption.setStagingLocation(PipelineConstants.PUBSUB_STAGING_LOCATION);
streamingOption.setRunner(DataflowRunner.class);
或将其作为参数传递。
您能否详细说明您所面临的许可问题?
这实际上是可能的,但 DirectPipelineRunner 不支持无界数据源。因此,您必须设置 amaxReadTime
或maxNumRecords
类似:
PubsubIO.Read.topic("projects/<project-id>/topics/<topic>").maxNumRecords(1000);
来自PubSub 文档:
一个 PTransform,它不断地从 Cloud Pub/Sub 流中读取数据并返回包含流中项目的字符串的 PCollection。当使用仅支持有界 PCollection(例如 DirectPipelineRunner)的 PipelineRunner 运行时,只能处理输入 Pub/Sub 流的有界部分。因此,必须设置 PubsubIO.Read.Bound.maxNumRecords(int) 或 PubsubIO.Read.Bound.maxReadTime(Duration)。