0

我有数据以块的形式到达演员,并希望将这些块作为流返回Play Result。由于获得响应的唯一方法Ok.stream看起来像是理想的候选人,因此如下所示:

Action.async { request =>
  (source ? GetStream()).map {
     case enumerator => Ok.stream(enumerator)
  }
}

我会Enumerator[Array[Byte]]从我的演员那里返回一个,然后在演员内部继续将块推入枚举器,因为消息到达演员。然而:从一个演员返回一个可变的枚举器绝对看起来像是某种违反。

有没有更合适的方法来做到这一点?我想要么akka-streamakka.io将是可能解决问题空间的抽象,但我看不出它们将如何应用。

4

1 回答 1

0

在提出更好的解决方案之前,我已经确定的解决方案是使用 ActorDSL 在我的非参与者调用者的上下文中捕获枚举器:

case GET(p"/stream/$streamId") => Action.async { request =>
    val (enumerator, channel) = Concurrent.broadcast[Array[Byte]]

    actor(new Act {

      storage ! Get(streamId)
      become {
        case DataStart(id, parts, bytes) =>
          sender() ! DataAck(id)
          become {
            case DataPart(_, i, b) =>
              channel.push(b.toArray)
            case DataEnd(_) =>
              channel.eofAndEnd()
          }
      }
    })

    Ok.stream(enumerator).as("text/plain")
  }

反对从演员返回和枚举器的论点是它是可变的和不可序列化的。但是您需要一个参与者来接收一系列消息以提供给枚举器。通过 DSL 创建actor,它被显式地嵌入到调用上下文中,因此枚举器不存在跨越序列化边界泄漏的风险。

于 2015-07-24T16:02:32.793 回答