1

我收到此错误,我尝试将 schemaregistry 的端口从 **8081 更改为 18081。我还添加了 avro-tools、kafka-schema-registry 等依赖项。

“first_name”:“John”,“last_name”:“Doe”,“age”:34,“height”:178.0,“weight”:75.0,“automated_email”:false} 21/07/18 19:26:56 INFO clients.Metadata:集群 ID:Gv4kdRbATLK6YtRxsf94vw 线程“主”org.apache.kafka.common.errors.SerializationException 中的异常:序列化 Avro 消息时出错原因:java.net.SocketException:来自服务器的文件意外结束 at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:866) at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:689) at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:863) at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:689) at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1610) at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1515) at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527) at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:212) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:807) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) at com.example.Producerexample$.main(Producerexample.scala:50) at com.example.Producerexample.main(Producerexample.scala)

进程以退出代码 1 结束

4

1 回答 1

0

这是上述错误的代码

package com.example

import java.util.Properties
//import org.apache.avro.util.Utf8
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, Producer,ProducerRecord,Callback,RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer


import com.example.Customer



object Producerexample {


  def main(args: Array[String]): Unit = {
    val properties: Properties = new Properties()
    // normal producer
    properties.setProperty("bootstrap.servers", "http://localhost:9092")
    properties.setProperty("acks", "all")
    properties.setProperty("retries", "10")
    // avro part
    properties.setProperty("key.serializer", classOf[StringSerializer].getName)
    properties.setProperty("value.serializer", classOf[KafkaAvroSerializer].getName)
    properties.setProperty("schema.registry.url", "http://localhost:18081")
    val producer: Producer[String, Customer] =
    new KafkaProducer[String, Customer](properties)
    val topic: String = "customer-avro2"
    // copied from avro examples
     val customer: Customer = Customer.newBuilder()
      .setFirstName("John")
      .setLastName("Doe")
      .setAge(34)
      .setHeight(178f)
      .setWeight(75f)
      .setAutomatedEmail(false)
      .build()
 val producerRecord: ProducerRecord[String, Customer] =
      new ProducerRecord[String, Customer](topic, customer)
    println(customer)
    producer.send(producerRecord, new Callback() {
      def onCompletion(metadata: RecordMetadata,
                       exception: Exception): Unit = {
        if (exception == null) {
          println(metadata)
        } else {
          exception.printStackTrace()
        }
      }
    }
    )
    producer.flush()
    producer.close()
  }
  }
于 2021-07-18T13:43:31.727 回答