我有一个简单的案例类
case class KafkaContainer(key: String, payload: AnyRef)
然后我想通过制作人将此发送到kafka主题我这样做
val byteArrayStream = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
output.write(msg)
output.close()
val bytes = byteArrayStream.toByteArray
producer.send(new ProducerRecord("my_topic", msg.key, bytes))
这运作良好
然后我尝试消费这个
Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
.map { msg =>
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
input.close()
...
}.runWith(Sink.ignore)
这适用于有效载荷中的任何类。
但!如果是任何参考。消费者代码失败
错误:(38, 96) 找不到类型为 com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val 输入的证据参数的隐式值:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
错误:(38, 96) 二进制方法的参数不足:(隐含证据 $21:com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer],隐含证据 $22:com.sksamuel.avro4s.FromRecord[test.messages. KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]。未指定值参数证据 $22。val 输入:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
如果我声明隐含
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
它无法编译
错误:(58, 71) 找不到类型为 com.sksamuel.avro4s.FromValue[Object] 隐式值的惰性隐式值 fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
错误:(58, 71) 方法lazyConverter 的参数不足:(隐式 fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]。未指定值参数 fromValue。隐式验证 fromRecord:FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
如果添加每个隐含的编译器是必需的
lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
编译失败并出现错误
宏扩展期间出现错误:(58, 69) 异常:java.lang.IllegalArgumentException:要求失败:需要一个案例类,但 Object 不在 com.sksamuel.avro4s 的 scala.Predef$.require(Predef.scala:277) 处。 FromRecord$.applyImpl(FromRecord.scala:283) 隐式验证 fromRecordObject: FromRecord[Object] = FromRecord[Object]
但是如果我为某个类替换 AnyRef - 不需要隐式,一切都会再次正常