0

我有一个简单的案例类

case class KafkaContainer(key: String, payload: AnyRef)

然后我想通过制作人将此发送到kafka主题我这样做

val byteArrayStream = new ByteArrayOutputStream()
      val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
      output.write(msg)
      output.close()
      val bytes = byteArrayStream.toByteArray
      producer.send(new ProducerRecord("my_topic", msg.key, bytes))

这运作良好

然后我尝试消费这个

Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
    .map { msg =>
      val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
      val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
      val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
      input.close()
        ...
    }.runWith(Sink.ignore)

这适用于有效载荷中的任何类。

但!如果是任何参考。消费者代码失败

错误:(38, 96) 找不到类型为 com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val 输入的证据参数的隐式值:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer

错误:(38, 96) 二进制方法的参数不足:(隐含证据 $21:com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer],隐含证据 $22:com.sksamuel.avro4s.FromRecord[test.messages. KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]。未指定值参数证据 $22。val 输入:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer

如果我声明隐含

implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

它无法编译

错误:(58, 71) 找不到类型为 com.sksamuel.avro4s.FromValue[Object] 隐式值的惰性隐式值 fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

错误:(58, 71) 方法lazyConverter 的参数不足:(隐式 fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]。未指定值参数 fromValue。隐式验证 fromRecord:FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

如果添加每个隐含的编译器是必需的

lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

编译失败并出现错误

宏扩展期间出现错误:(58, 69) 异常:java.lang.IllegalArgumentException:要求失败:需要一个案例类,但 Object 不在 com.sksamuel.avro4s 的 scala.Predef$.require(Predef.scala:27​​7) 处。 FromRecord$.applyImpl(FromRecord.scala:283) 隐式验证 fromRecordObject: FromRecord[Object] = FromRecord[Object]

但是如果我为某个类替换 AnyRef - 不需要隐式,一切都会再次正常

4

1 回答 1

1

我在使用 Any 数据类型时遇到了类似的问题。您必须指定此成员变量的有效类型,因为 Any 或 AnyRef 可以是任何类型。然后使用 Either 或 shapeless(另见Github 文档)。就我而言,它可以是 String、Long、Double 或 null,所以使用 shapeless 你可以:

case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])

这将转换为 AVRO 中的联合类型:

{
    "name" : "value",
    "type" : [ "null", "string", "long", "double" ]
}
于 2019-01-23T11:41:16.813 回答