0

我有以下案例类:

case class Alpakka(id:Int,name:String,animal_type:String)

我正在尝试使用以下代码将这些案例类的列表连接到 kafka 中的生产者:

  def connectEntriesToProducer(seq: Seq[Alpakka]) = {


    val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

seq.map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))   
      .runWith(Producer.plainSink(producerSettings))
  }

我正在使用 circe 将案例类转换为 json。但是我不断收到一个编译器错误说:

Error:(87, 34) type mismatch;
 found   : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
 required: org.apache.kafka.clients.producer.ProducerRecord[String,String] => ?
      .runWith(Producer.plainSink(producerSettings))

我不确定发生了什么事!

4

1 回答 1

0

您正在尝试Graph从 aSeq而不是 a 构建 a Source

你的方法connectEntriesToProducer应该看起来像

def connectEntriesToProducer(seq: Source[Alpakka]) = {

注意,Source而不是Seq.

或者,您可以从 a 构建源代码Seq,但您必须使用,immutable.Seq因为Source.apply它只会采用不可变的可迭代对象。

def connectEntriesToProducer(seq: scala.collection.immutable.Seq[Alpakka]) = {
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")

Source(seq).
  map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))
  .runWith(Producer.plainSink(producerSettings))
}
于 2019-04-08T22:11:22.973 回答