2

使用spring-cloud-stream-binder-kafka. 可能有人可以看到什么是错的?

当我使用https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/中的示例时, 它工作正常,我可以在 Confluent Cloud 上看到消息

但是,当使用配置添加相同的连接详细信息时spring-cloud-stream-binder-kafka,它会返回未经授权的错误。

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"MySchema","namespace":"org.test","fields":[{"name":"value","type":"double"}]}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

我的下面的配置给出了上述错误。不知道出了什么问题?

  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers: myinstance.us-east1.gcp.confluent.cloud:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
            basic.auth.credentials.source: USER_INFO
            schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
          configuration:
            ssl.endpoint.identification.algorithm: https
            sasl.mechanism: PLAIN
            request.timeout.ms: 20000
            retry.backoff.ms: 500
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
            security.protocol: SASL_SSL
      bindings:
        normals-out:
          destination: normals
          contentType: application/*+avro

Confluent 中运行良好的示例:

  kafka:
    bootstrap-servers:
      - myinstance.us-east1.gcp.confluent.cloud:9092
    properties:
      ssl.endpoint.identification.algorithm: https
      sasl.mechanism: PLAIN
      request.timeout.ms: 20000
      retry.backoff.ms: 500
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
      security.protocol: SASL_SSL
      schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
      basic.auth.credentials.source: USER_INFO
      schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    template:
      default-topic:
logging:
  level:
    root: info
4

1 回答 1

0

我的问题只是我在我的 pom.xml 中缺少一个依赖项。

我应该删除我的问题,但我把它留在这里作为配置确实像上面那样工作的参考。

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry-client</artifactId>
  <version>5.3.0</version>
</dependency>
于 2020-05-27T17:05:28.467 回答