我在 Spark Streaming 中使用 Kafka Source 来接收使用 Confluent Cloud 中的 Datagen 生成的记录。我打算使用 Confluent Schema Registry,
目前,这是我面临的例外:*
线程“主”io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException 中的异常:未授权;错误代码:401
融合云的架构注册表需要传递一些我不知道如何输入的身份验证数据:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret
我想我必须将此身份验证数据传递给 CachedSchemaRegistryClient 但我不确定是否如此以及如何。
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
如果我尝试将身份验证发送到架构注册表
val restService = new RestService(schemaRegistryURL)
val props = Map(
"basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> "secret:secret"
).asJava
var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
我明白
Cannot resolve overloaded constructor CachedSchemaRegistryClient
了,似乎只有 2 个参数要发送到 CachedSchemaRegistryClient。
我该如何解决?
我遇到了这篇文章,但在这里他们没有对融合云中的模式注册表应用任何身份验证。