1

我正在尝试使用 Apache Bahir 连接到 Akka,使用 Spark Streaming 设置一个简单的过程。我试图以他们为榜样,与这位较老的人一起学习。我有一个简单的转发演员

class ForwarderActor extends ActorReceiver {
  def receive = {
    case data: MyData => store(data)
  }
}

我创建了一个流

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName)

配置如下所示:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "localhost"
      port = 7777
    }
  }
}

我的问题是:如何向 Forwarder 演员发送消息?也许我不明白在这种情况下如何使用 Akka Remote。当应用程序启动时,我看到一个日志

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test@localhost:7777]

后来我看到

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://streaming-actor-system-0@192.168.192.7:52369]

这似乎提醒了ScalaDoc中的描述:

 /**
   * A default ActorSystem creator. It will use a unique system name
   * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
   * communication.
   */

总而言之,我不确定我应该如何向 Forwarder 演员发送消息。谢谢你的帮助!

4

1 回答 1

0

Akka actor 可以向在远程 JVM 上运行的其他 Akka actor 发送消息。所以......当发送者参与者需要知道预期接收者参与者的地址时。

AkkaUtil (Bahir) 使您能够从接收到的消息中创建火花流ReceiverActor。但是,从哪里接收消息?嗯......一些遥远的演员。要发送消息,这个远程参与者将需要您ReceiverActor在 spark-application 中运行的地址。

一般来说,您不能太确定将运行您的 spark 应用程序的 ip。因此,我们将这样做,以便使用 spark 运行的 Actor 将告诉生产者 Actor 它的引用并请求它发送它的东西。

只需确保两个应用程序都使用相同版本的 Scala 编写并运行相同的 JRE。

现在...让我们首先编写将成为数据源的演员,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class MyActor extends Actor with ActorLogging {

  val theListOfMyStrings = List("one", "two", "three")

  override def receive: Receive = {
    case SendMeYourStringsRequest(requesterRef) => {
      theListOfMyStrings.foreach(s => {
        requesterRef ! RequestedString(s)
      })
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="my-ip-address"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

现在... 让我们编写简单的 spark 应用程序,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class YourStringRequesterActor extends ActorReceiver {
  def receive = {
    case RequestedString(s) => store(s)
  }

  override def preStart(): Unit = {
    val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
    val myActorSelection = context.actorSelection(myActorPath)

    myActorSelection ! SendMeYourStringsRequest(self)
  }
}

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ActorWordCount")

    if (!sparkConf.contains("spark.master")) {
      sparkConf.setMaster("local[2]")
    }

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val stringStream = AkkaUtils.createStream[String](
        ssc,
        Props(classOf[YourStringRequesterActor]),
        "your-string-requester-actor"
    )

    stringStream.foreach(println)

  }
}

注意::只要照顾好my-ip-address. 如果还有其他问题,请在评论中告诉我。

于 2017-01-28T22:01:41.053 回答