0

我有一个来自 web 套接字的 akka 流,比如akka 流使用 web 套接字,并且想构建一个可重用的图形阶段(inlet:流,FlowShape:向 JSON 中添加一个附加字段,指定来源,即

{
...,
"origin":"blockchain.info"
}

outlet卡夫卡。

我面临以下3个问题:

  • 无法Inlet从 Web 套接字流中创建自定义
  • 无法将 kafka 直接集成到流中(参见下面的代码)
  • 不确定是否需要转换器添加附加字段来反序列化 json 以添加来源

示例项目(仅限流程)如下所示:

import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val incoming: Sink[Message, Future[Done]] =
    Flow[Message].mapAsync(4) {
      case message: TextMessage.Strict =>
        println(message.text)
        Future.successful(Done)
      case message: TextMessage.Streamed =>
        message.textStream.runForeach(println)
      case message: BinaryMessage =>
        message.dataStream.runWith(Sink.ignore)
    }.toMat(Sink.last)(Keep.right)

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))

val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .toMat(incoming)(Keep.both)
      // TODO not working integrating kafka here
      // .map(_.toString)
      //    .map { elem =>
      //      println(s"PlainSinkProducer produce: ${elem}")
      //      new ProducerRecord[Array[Byte], String]("topic1", elem)
      //    }
      //    .runWith(Producer.plainSink(producerSettings))
      .run()

val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      system.terminate
    }
  }

// kafka that works / writes dummy data
val done1 = Source(1 to 100)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))
4

1 回答 1

2

一个问题是关于incoming舞台的,它被建模为Sink. 它应该被建模为Flow. 随后将消息输入Kafka。

因为传入的短信可以Streamed。您可以flatMapMerge按如下方式使用组合器来避免将整个(可能很大)消息存储在内存中:

  val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
    case msg: BinaryMessage =>
      msg.dataStream.runWith(Sink.ignore)
      Future.successful(None)
    case TextMessage.Streamed(src) =>
      src.runFold("")(_ + _).map { msg => Some(msg) }
  }.collect {
    case Some(msg) => msg
  }

此时你得到了一些可以产生字符串的东西,并且可以连接到 Kafka:

  val addOrigin: Flow[String, String, NotUsed] = ???

  val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .via(incoming)
      .via(addOrigin)
      .map { elem =>
        println(s"PlainSinkProducer produce: ${elem}")
        new ProducerRecord[Array[Byte], String]("topic1", elem)
      }
      .toMat(Producer.plainSink(producerSettings))(Keep.both)
      .run()
于 2017-05-03T07:48:41.233 回答