0

使用 Kafka 发送大文件时,是否可以跨分区分发,然后使用 Akka-Stream 重新组装?如本演示文稿所述:

http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297

4

1 回答 1

2

“分块”方面,即生产者,很容易使用响应式 kafka之类的东西编写:

case class LargeMessage(bytes : Seq[Byte], topic : String)

def messageToKafka(message : LargeMessage, maxMessageSize : Int) = 
  Source.fromIterator(() => message.bytes.toIterator)
        .via(Flow[Byte].grouped(maxMessageSize))
        .via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
        .runWith(Producer.plainSink(producerSettings)

“重新组装”,即消费者,可以以类似于文档的方式实现:

   val messageFut : Future[LargeMessage] = 
     for {
       bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
     } yield LargeMessage(bytes, topic)
于 2016-10-17T17:05:59.120 回答