我正在尝试在使用 Apache Flink 作为运行时的Kinesis Data Analytics中运行 Apache Beam 应用程序。管道使用PubsubIO连接器。我正在尝试使用 code 向 Google Cloud 进行身份验证,因为 Kinesis Data Analytics 不允许导出环境变量,因此导出GOOGLE_APPLICATION_CREDENTIALS环境变量似乎不是一个选项。
我正在尝试使用如下代码进行身份验证。
GoogleCredentials credential = GoogleCredentials
.fromStream(credentialJsonInputStream)
.createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
credential.refreshIfExpired();
options.setGcpCredential(credential);
此处的选项引用继承了 PubsubOptions。
但是在运行应用程序时它会失败并出现以下异常:
线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:com.google.api.client.googleapis.json.GoogleJsonResponseException:403 Forbidden POST https://pubsub.googleapis.com/v1/projects/my -项目/主题/我的主题:发布 { "code" : 403, "errors" : [ { "domain" : "global", "message" : "请求缺少有效的 API 密钥。", "reason" : "forbidden" } ], "message" :“请求缺少有效的 API 密钥。”,“状态”:“PERMISSION_DENIED”} at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam .runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner .run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com。亚马逊。kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)
在调试时,我注意到PubsubOptions
传递给org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClientnull
的引用在调用时返回GcpOptions#getGcpCredential
我非常感谢有关如何在这种情况下进行身份验证的任何见解。