4

我有一个正在运行的流数据流来读取 PUB/SUB 订阅。

经过一段时间或可能在处理一定数量的数据后,我希望管道自行停止。我不希望我的计算引擎实例无限期地运行。

当我通过数据流控制台取消作业时,它显示为失败的作业。

有没有办法做到这一点?我错过了什么吗?或者 API 中缺少该功能。

4

2 回答 2

5

你能做这样的事情吗?

Pipeline pipeline = ...;
... (construct the streaming pipeline) ...
final DataflowPipelineJob job =
    DataflowPipelineRunner.fromOptions(pipelineOptions)
                          .run(pipeline);
Thread.sleep(your timeout);
job.cancel();
于 2015-06-16T15:49:30.910 回答
0

我能够使用 Rest API 在数据流上耗尽(取消作业而不丢失数据)正在运行的流式作业。

看我的回答

使用 Rest Update方法,这个 body :

{ "requestedState": "JOB_STATE_DRAINING" }

于 2018-06-12T15:15:56.517 回答