编辑:此答案仅适用于旧版本的 spark 和 akka。PH88 的答案是最新版本的正确方法。
您可以使用akka.actor.Actor
提供 Source 的中间件(类似于此问题)。下面的解决方案不是“反应式”的,因为底层 Actor 需要维护一个 RDD 消息缓冲区,如果下游 http 客户端没有足够快地消耗块,则可能会丢弃这些消息。但是无论实现细节如何,都会出现此问题,因为您无法将 akka 流背压的“节流”连接到 DStream 以减慢数据速度。这是因为 DStream 没有实现org.reactivestreams.Publisher
.
基本拓扑为:
DStream --> Actor with buffer --> Source
要构建此拓扑,您必须创建一个类似于此处实现的 Actor :
//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props
基于 JobManager 创建 ByteStrings(消息)的流 Source。此外,将HttpResponse 所需的转换ByteString
为:HttpEntity.ChunkStreamPart
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString
type Message = ByteString
val messageToChunkPart =
Flow[Message].map(HttpEntity.ChunkStreamPart(_))
//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] =
Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
将 Spark DStream 链接到 Actor,以便将每个传入的 RDD 转换为 ByteString 的 Iterable,然后转发给 Actor:
import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD
val dstream : DStream = ???
//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
def sendMessageToActor(message : Message) = actorRef ! message
//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
向 HttpResponse 提供源:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
注意:该行和 HttpReponse 之间应该有很少的时间/代码,dstream foreachRDD
因为在执行该行之后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息foreach
。