0

将 Kafka Connect BigQuery 连接器与适用于 Kubernetes 的 Strimzi Operator 一起使用时出现以下错误:

com.google.cloud.bigquery.BigQueryException: Error getting access token for service account
...
java.io.IOException: Error getting access token for service account: oauth2.googleapis.com
...
java.net.UnknownHostException: oauth2.googleapis.com

有什么想法可能导致这种情况吗?我在公司代理后面运行它,但我已将其添加为环境变量。

我还在 Kafka Connect 集群 pod 中运行了 curl oauth2.googleapis.com,返回一个 html 响应,其内容中包含错误 404(未找到)。

我的 Kubernetes 设置如下所示:

卡夫卡连接:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: kafka-connect-cluster
  namespace: message-bus
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.0.0
  resources:
    requests:
      cpu: 12
      memory: 64Gi
    limits:
      cpu: 12
      memory: 64Gi
  replicas: 1
  bootstrapServers: kafka-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: kafka-cluster-cluster-ca-cert
        certificate: ca.crt
  image: docker.io/alexanghel23/kafka-connect-plugins:v0.2.0
  template:
    connectContainer:
      env:
        - name: https_proxy
          value: http://XXX.XXX.XX.XXX:3128
        - name: http_proxy
          value: http://XXX.XXX.XX.XXX:3128
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: /opt/kafka/external-configuration/gcp-credentials/kafka-bq.json
  externalConfiguration:
    volumes:
      - name: gcp-credentials
        secret:
          secretName: kafka-bq
  config:
    config.providers: env
    config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false

卡夫卡连接器:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: alerts-bq
  namespace: message-bus
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
  tasksMax: 1
  config:
    project: ai4neo-dev
    defaultDataset: ".*=test_kafka"
    topics: alerts
    keySource: FILE
    keyfile: "/opt/kafka/external-configuration/gcp-credentials/kafka-bq.json"
    proxy.url: "http://xxx.xxx.xx.xxx:3128"

服务帐号:

{
  "type": "service_account",
  "project_id": "ai4neo-dev",
  "private_key_id": "81<omitted>1e",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEv<omitted>qpefw=\n-----END PRIVATE KEY-----\n",
  "client_email": "kafka-bq@ai4neo-dev.iam.gserviceaccount.com",
  "client_id": "10<omitted>21",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/kafka-bq%40ai4neo-dev.iam.gserviceaccount.com"
}

完整错误:

2021-12-29 10:48:49,051 ERROR WorkerSinkTask{id=alerts-bq-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error getting access token for service account: oauth2.googleapis.com (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-alerts-bq-0]
com.google.cloud.bigquery.BigQueryException: Error getting access token for service account: oauth2.googleapis.com
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:113)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:285)
at com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:678)
at com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:675)
at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:674)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.lambda$retrieveCachedTable$2(BigQuerySinkTask.java:338)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1133)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.retrieveCachedTable(BigQuerySinkTask.java:338)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordTable(BigQuerySinkTask.java:210)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Error getting access token for service account: oauth2.googleapis.com
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:444)
at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:157)
at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:145)
at com.google.auth.oauth2.ServiceAccountCredentials.getRequestMetadata(ServiceAccountCredentials.java:603)
at com.google.auth.http.HttpCredentialsAdapter.initialize(HttpCredentialsAdapter.java:91)
at com.google.cloud.http.HttpTransportOptions$1.initialize(HttpTransportOptions.java:159)
at com.google.api.client.http.HttpRequestFactory.buildRequest(HttpRequestFactory.java:88)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.buildHttpRequest(AbstractGoogleClientRequest.java:422)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:283)
... 22 more
Caused by: java.net.UnknownHostException: oauth2.googleapis.com
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:220)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:299)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:474)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:569)
at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:203)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:189)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1367)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:113)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:441) 

谢谢!

4

0 回答 0