2

我需要使用以下接口创建一个函数:

import akka.kafka.scaladsl.Consumer.Control

object ItemConversionFlow {

def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
    // Implementation goes here
}

我的问题是我不知道如何以适合上述界面的方式定义流程。

当我做这样的事情时

val flow = Flow[Item]
    .map(item => doConversion(item)
    .filter(_.isDefined)
    .map(_.get)

结果类型为 Flow[Item, OtherItem, NotUsed]。到目前为止,我还没有在 Akka 文档中找到任何内容。此外,akka.stream.scaladsl.Flow 上的功能仅提供“未使用”而不是控制。如果有人能指出我正确的方向,那就太好了。

一些背景知识:我需要设置几个仅在转换部分进行区分的管道。这些管道是主流的子流,可能由于某种原因而停止(相应的消息到达某个 kafka 主题)。因此我需要控制部分。我们的想法是创建一个 Graph 模板,我只需在其中插入提到的流作为参数(返回它的工厂)。对于特定情况,我们有一个可行的解决方案。为了概括它,我需要这种流程。

4

1 回答 1

0

你实际上有背压。然而,想想你真正需要什么背压......你没有使用异步阶段来增加你的吞吐量......例如。背压避免快速生产者过度增长订阅者https://doc.akka.io/docs/akka/2.5/stream/stream-rate.html。在您的示例中不要担心,您的流将根据doConversion完成所需的时间向发布者请求新元素。

如果您想获得流的结果,请使用 toMat 或 viaMat。例如,如果您的流发出 Item 并将其转换为 OtherItem:

val str = Source.fromIterator(() => List(Item(Some(1))).toIterator)
  .map(item => doConversion(item))
  .filter(_.isDefined)
  .map(_.get)
  .toMat(Sink.fold(List[OtherItem]())((a, b) => {
      // Examine the result of your stream
      b :: a
    }))(Keep.right)
  .run()

str 将是 Future[List[OtherItem]]。尝试将此推断到您的情况。

或者将 toMat 与 KillSwitches 一起使用,“创建一个 [[FlowShape]] 的新 [[Graph]],它物化到一个外部开关,允许外部完成 * 该独特物化。不同的物化会产生不同的、独立的开关。”

  def build(config: StreamConfig): Flow[Item, OtherItem, UniqueKillSwitch] = {
    Flow[Item]
      .map(item => doConversion(item))
      .filter(_.isDefined)
      .map(_.get)
      .viaMat(KillSwitches.single)(Keep.right)
  }


  val stream = 
    Source.fromIterator(() => List(Item(Some(1))).toIterator)
    .viaMat(build(StreamConfig(1)))(Keep.right)
    .toMat(Sink.ignore)(Keep.both).run

  // This stops the stream
  stream._1.shutdown()

  // When it finishes
  stream._2 onComplete(_ => println("Done"))
于 2018-01-17T09:30:35.570 回答