5

我正在构建一个 REST API,它在 Spark 集群中开始一些计算并以结果的分块流进行响应。给定带有计算结果的 Spark 流,我可以使用

dstream.foreachRDD()

从 Spark 发送数据。我正在使用 akka-http 发送分块的 HTTP 响应:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

为简单起见,我试图先让纯文本工作,稍后再添加 JSON 编组。

但是使用 Spark DStream 作为 Akka 流的源的惯用方式是什么?我想我应该能够通过套接字来做到这一点,但由于 Spark 驱动程序和 REST 端点位于同一个 JVM 上,为此打开一个套接字似乎有点矫枉过正。

4

2 回答 2

8

在提问时不确定 api 的版本。但是现在,有了 akka-stream 2.0.3,我相信你可以这样做:

val source = Source
  .actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead)
  .mapMaterializedValue[Unit] { actorRef =>
    dstream.foreach(actorRef ! _)
  }
于 2016-02-07T16:33:23.673 回答
3

编辑:此答案仅适用于旧版本的 spark 和 akka。PH88 的答案是最新版本的正确方法。

您可以使用akka.actor.Actor提供 Source 的中间件(类似于此问题)。下面的解决方案不是“反应式”的,因为底层 Actor 需要维护一个 RDD 消息缓冲区,如果下游 http 客户端没有足够快地消耗块,则可能会丢弃这些消息。但是无论实现细节如何,都会出现此问题,因为您无法将 akka 流背压的“节流”连接到 DStream 以减慢数据速度。这是因为 DStream 没有实现org.reactivestreams.Publisher.

基本拓扑为:

DStream --> Actor with buffer --> Source

要构建此拓扑,您必须创建一个类似于此处实现的 Actor :

//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props 

基于 JobManager 创建 ByteStrings(消息)的流 Source。此外,将HttpResponse 所需的转换ByteString为:HttpEntity.ChunkStreamPart

import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString

type Message = ByteString

val messageToChunkPart = 
  Flow[Message].map(HttpEntity.ChunkStreamPart(_))

//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
  Source(ActorPublisher[Message](actorRef)) via messageToChunkPart

将 Spark DStream 链接到 Actor,以便将每个传入的 RDD 转换为 ByteString 的 Iterable,然后转发给 Actor:

import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD

val dstream : DStream = ???

//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???

def sendMessageToActor(message : Message) = actorRef ! message

//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}

向 HttpResponse 提供源:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

注意:该行和 HttpReponse 之间应该有很少的时间/代码,dstream foreachRDD因为在执行该行之后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息foreach

于 2015-10-28T13:33:01.953 回答