0

我是使用 akka 流 kafka (和一般的 akka 流)的新手。我正在尝试构建一个图表,以便将消息发布到不同的主题。如何将生产者连接为流以提交已处理的消息?我尝试使用 Producer.flow 但我无法获得commitScaladsl

object TestFoo {
  import akka.kafka.ProducerMessage.Message
  implicit val system = ActorSystem("test-kafka")
  implicit val materializer = ActorMaterializer()
  val evenNumbersTopic = "even_numbers"
  val allNumbersTopic = "all_numbers"
  lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int])
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  lazy val source =  Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic)))
  val producerSettings = ProducerSettings(system,  new StringSerializer(), new StringSerializer())
    .withBootstrapServers("localhost:9092")
  val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    import akka.stream.scaladsl.GraphDSL.Implicits._
    type TypedMessage =  Message[String, Int,CommittableOffset]
    val bcast = b.add(Broadcast[TypedMessage](2))
    val merge = b.add(Merge[TypedMessage](2))

    val evenFilter = Flow[TypedMessage].filter (  c => c.record.value() % 2 == 0)
    val justEven = Flow[TypedMessage].map{
      case Message(pr, offset) =>
      val r = new ProducerRecord[String, Int]("general", pr.value())
      Message(r, offset)
    }
    val allNumbers = Flow[TypedMessage].map{
      case Message(pr, offset) =>
      val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value())
      Message(r, offset)
    }

    val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg =>
      val r = new ProducerRecord[String, Int]("general", msg.record.value())
      Message(r, msg.committableOffset)
    }
    source ~> toMsg ~> bcast

    bcast ~> evenFilter ~> justEven ~> merge
    bcast ~> allNumbers ~> merge
    merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result =>
      result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl()
    }
    ClosedShape 
  })}
4

1 回答 1

0

因为您使用的是 GraphDSL,所以编译器无法PassThrough从前一阶段推断类型。尝试将类型参数显式传递给Producer.flow函数,例如

merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result =>
  result.message.passThrough.commitScaladsl()
}

我已经离开了KV作为未绑定的参数,请在其中放置您的 Producer 必须生成的任何键/值类型。如果您希望上面的代码正确连接,您需要将producerSettings类型与来自合并阶段的内容相匹配。你需要类似的东西:

val producerSettings = ProducerSettings(system,  new StringSerializer(), new JsonSerializer[Int])
    .withBootstrapServers("localhost:9092")
于 2017-08-14T12:48:36.333 回答