1

我正在尝试在使用 Apache Flink 作为运行时的Kinesis Data Analytics中运行 Apache Beam 应用程序。管道使用PubsubIO连接器。我正在尝试使用 code 向 Google Cloud 进行身份验证,因为 Kinesis Data Analytics 不允许导出环境变量,因此导出GOOGLE_APPLICATION_CREDENTIALS环境变量似乎不是一个选项。

我正在尝试使用如下代码进行身份验证。

    GoogleCredentials credential = GoogleCredentials
            .fromStream(credentialJsonInputStream)
            .createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
    credential.refreshIfExpired();

    options.setGcpCredential(credential);

此处的选项引用继承了 PubsubOptions

但是在运行应用程序时它会失败并出现以下异常:

线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:com.google.api.client.googleapis.json.GoogleJsonResponseException:403 Forbidden POST https://pubsub.googleapis.com/v1/projects/my -项目/主题/我的主题:发布 { "code" : 403, "errors" : [ { "domain" : "global", "message" : "请求缺少有效的 API 密钥。", "reason" : "forbidden" } ], "message" :“请求缺少有效的 API 密钥。”,“状态”:“PERMISSION_DENIED”} at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam .runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner .run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com。亚马逊。kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)

在调试时,我注意到PubsubOptions传递给org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClientnull的引用在调用时返回GcpOptions#getGcpCredential

我非常感谢有关如何在这种情况下进行身份验证的任何见解。

4

1 回答 1

1

GcpOptions#setGcpCredential option can’t be used with Flink runner, because the Flink runner serializes PipelineOptions, but the getGcpCredential is annotated with @JsonIgnore.

When no credential has been set explicitly via GcpOptions#setGcpCredential, GCP services such as Pub/Sub uses a credential based upon the currently set GcpOptions#credentialFactoryClass.

So instead of calling options.setGcpCredential(credential), we can define a custom GcpCredentialFactory class. Then pass it to GcpOptions#credentialFactoryClass

options.setCredentialFactoryClass(CustomGcpCredentialFactory.class);

Your application's PipelineOptions interface would need to extend the GcpOptions interface, for you to be able to call the above method on your options reference.

public class CustomCredentialFactory extends GcpCredentialFactory {

  private static CustomCredentialFactory INSTANCE = new CustomCredentialFactory();

  private CustomCredentialFactory(PipelineOptions o) {  }

  /**
   * Required by GcpOptions.GcpUserCredentialsFactory#create(org.apache.beam.sdk.options.PipelineOptions)
   */
  public static CustomCredentialFactory fromOptions(PipelineOptions o) {

      return new CustomCredentialFactory(o);
   }

  @Override
  public Credentials getCredential() {

      try {

          // Load the GCP credential file (from S3, Jar, ..)
          InputStream credentialFileInputStream = SomeUtil.getCredentialInputStream();

          return GoogleCredentials
                  .fromStream(credentialFileInputStream)
                  .createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");

      } catch (IOException e) {
          return null;
      }
  }
}

于 2021-08-15T17:51:56.630 回答