30

我正在尝试使用Source.actorRef方法来创建akka.stream.scaladsl.Source对象。某种形式的东西

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

我的问题是:如何将数据发送到基于 ActorRef 的 Source 对象

我认为向 Source 发送消息是某种形式

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

weatherSource没有!运算符或tell方法。

文档对如何使用 Source.actorRef 没有太多描述性,它只是说您可以...

提前感谢您的评论和回复。

4

3 回答 3

25

你需要一个Flow

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.{Sink, Flow}

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

请记住,这都是实验性的,可能会改变!

于 2015-06-11T18:17:44.837 回答
8

正如@Noah 指出 akka-streams 的实验性质,他的回答可能不适用于 1.0 版本。我必须遵循这个例子给出的例子

implicit val materializer = ActorMaterializer()
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
actorRef ! TweetInfo(...)
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
于 2015-09-13T19:52:34.147 回答
8

的实例ActorRef,与所有“物化值”一样,只有在整个流被物化时才可访问,或者换句话说,当 RunnableGraph 正在运行时。

// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))

// You get ActorRef instance as a materialized value
val actorRef1: ActorRef = rg1.run()

// Or even more correct way: to materialize both ActorRef and future to completion 
// of the stream, so that we know when we are done:

// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
// (ActorRef, Future[Done]) when you run the graph
val rg2: RunnableGraph[(ActorRef, Future[Done])] =
  sunnySource.toMat(Sink.foreach(println))(Keep.both)

// You get both ActorRef and Future[Done] instances as materialized values
val (actorRef2, future) = rg2.run()

actorRef2 ! Weather("90210", 72.0, false)
actorRef2 ! Weather("02139", 32.0, true)
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
future onComplete { /* ... */ }
于 2016-05-29T18:57:55.700 回答