4

我知道从 Akka 2.4.16 开始,没有 Reactive Streams 的“远程”实现。该规范侧重于在单个 JVM 上运行的流。

但是,考虑到用例涉及另一个 JVM 进行某些处理,同时保持背压。这个想法是让一个主应用程序提供一个运行流的用户界面。例如,这个流有一个阶段执行一些应该在不同机器上运行的繁重计算。我对以分布式方式运行流的方法感兴趣 - 我遇到了一些文章指出了一些想法:

还有哪些其他选择?以上有什么明显的缺点吗?有什么特殊的特征需要考虑吗?

更新:这个问题不限于单个用例。我通常对在分布式环境中使用流的所有可能方式感兴趣。这意味着,例如,它可以只涉及一个将参与者集成在一起的流,.mapAsync或者例如,在通过 Akka HTTP 通信的两台机器上可能有两个单独的流。唯一的要求是必须在所有组件之间强制执行背压。

4

2 回答 2

1

好吧...看来我将不得不为此添加一个示例。您需要了解的一件事是 BackPressure 由 GraphStages 中的 AsyncBoundries 处理。它实际上与其他地方存在的组件无关。另外......它不依赖于动脉,它只不过是新的远程传输。

这是一个可能是最简单的跨 jvm 流的示例,

首次申请,

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

class MyActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case msg @ _ => {
      log.info(msg.toString)
      sender() ! msg
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

第二个应用程序......实际上“运行”了在第一个应用程序中使用演员的流。

import akka.actor.{ActorPath, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.pattern.ask
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps
import scala.concurrent.duration._

object YourApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=19000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("your-actor-system", config)

  import actorSystem.dispatcher

  val logger = actorSystem.log

  implicit val implicitActorSystem = actorSystem
  implicit val actorMaterializer = ActorMaterializer()

  val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@127.0.0.1:18000/user/my-actor")

  val myActorSelection = actorSystem.actorSelection(myActorPath)

  val source = Source(1 to 10)

  // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage
  val myRemoteComponent = Flow[Int].mapAsync(2)(i => {
    myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
      (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int])
    )
  })

  val sink = Sink.foreach[Int](i => logger.info(i.toString))

  val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right)

  val streamRun = stream.run()

}
于 2017-01-18T14:40:48.677 回答
0

在 Akka 2.5.10 及更高版本中,您现在可以为此使用StreamRefs 。StreamRefs 是为此用例设计的,因此它们特别适用于远程工作队列,因为它们会背压,直到本地附加到它们的流可以接受更多工作。

于 2020-01-31T14:56:02.340 回答