Akka actor 可以向在远程 JVM 上运行的其他 Akka actor 发送消息。所以......当发送者参与者需要知道预期接收者参与者的地址时。
AkkaUtil (Bahir) 使您能够从接收到的消息中创建火花流ReceiverActor
。但是,从哪里接收消息?嗯......一些遥远的演员。要发送消息,这个远程参与者将需要您ReceiverActor
在 spark-application 中运行的地址。
一般来说,您不能太确定将运行您的 spark 应用程序的 ip。因此,我们将这样做,以便使用 spark 运行的 Actor 将告诉生产者 Actor 它的引用并请求它发送它的东西。
只需确保两个应用程序都使用相同版本的 Scala 编写并运行相同的 JRE。
现在...让我们首先编写将成为数据源的演员,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class MyActor extends Actor with ActorLogging {
val theListOfMyStrings = List("one", "two", "three")
override def receive: Receive = {
case SendMeYourStringsRequest(requesterRef) => {
theListOfMyStrings.foreach(s => {
requesterRef ! RequestedString(s)
})
}
}
}
object MyApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="my-ip-address"
| port=18000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("my-actor-system", config)
var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")
}
现在... 让我们编写简单的 spark 应用程序,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class YourStringRequesterActor extends ActorReceiver {
def receive = {
case RequestedString(s) => store(s)
}
override def preStart(): Unit = {
val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
val myActorSelection = context.actorSelection(myActorPath)
myActorSelection ! SendMeYourStringsRequest(self)
}
}
object YourSparkApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ActorWordCount")
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stringStream = AkkaUtils.createStream[String](
ssc,
Props(classOf[YourStringRequesterActor]),
"your-string-requester-actor"
)
stringStream.foreach(println)
}
}
注意::只要照顾好my-ip-address
. 如果还有其他问题,请在评论中告诉我。