1

我正在尝试实现一个具有背压功能的基于参与者的系统。作为要求,主进程接收 JSON 格式的流数据。然而,每个 JSON 事件都有几个字段,例如 {ip: '123.43.12.1', country: 'US', ... etc}。JSON的结构是事先知道的。

现在,我必须以某种方式将 JSON 结构扁平化为 (key, value) 对。例如,上面的数据可以被展平成(ip, freq), (country, freq),其中freq是一个ip(比如'123.43.12.1')在数据流中出现的次数。

一种非常自然的方法是将每个(键,值)对转发给相应的子/远程参与者以进行进一步评估。例如 ('123.43.12.1', 1) 发送给 IP-Actor;('US', 1) 被发送到 Country-Actor 等等。

我想确保整个系统都处于背压状态。在这种情况下事情会更加复杂,因为事件 {ip: '123.43.12.1', country: 'US'} 只有在 IP-Actor 和 Country-Actor 都完成了扁平化对 ('123.43. 12.1', 1), ('美国', 1)。每个参与者可能有不同的处理速度(例如,IP-Actor 比 Country-Actor 快得多)。在这种情况下,我希望接收流的主进程将等待/阻塞,直到有需求信号(当两个参与者完成处理其邮箱中的现有数据时发生)。否则,某些参与者可能会在邮箱中充满消息(国家参与者 - 慢一个),但由于其他参与者邮箱是空的(IP-参与者 - 较快的一个),消息仍然会进来。

任何人都可以建议反应流规范是否提供这样的功能。如果没有,无论如何以最有效的方式实现功能。

谢谢。

4

1 回答 1

0

您描述的 Actor 之间的同步类型正是您希望在 Actor 模型中避免的。任何“等待/阻塞”都与反应式编程相反。我建议使用单个流Flow进行更新。

您首先需要处理json数据:

import akka.stream.scaladsl._

//your original source of json strings
val jsonSrc : Source[String, NotUsed] = ???

case class JsonObject(ip : String, country : String)

//use your favorite json parser
def jsonParser(jsonStr : String) : JsonObject = ???

val parserFlow = Flow[String] map jsonParser

接下来定义计数器逻辑并用于Flow.scan生成具有递增值的计数器:

type IPCounter = Map[String,Int]
val emptyIPCounter = Map.empty[String,Int] withDefaultValue 0

type CountryCounter = Map[String, Int]
val emptyCountryCounter = Map.empty[String,Int] withDefaultValue 0

type Counters = Tuple2[IPCounter, CountryCounter]
val emptyCounters = (emptyIPCounter, emptyCountryCounter)

def updateCounters(counters : Counters, jsonObj : JsonObject) : Counters = {
  (counters._1.updated(jsonObj.ip, counters._1(jsonObj.ip) + 1),
   counters._2.updated(jsonObj.country, counters._2(jsonObj.country) + 1))
}

val counterFlow = Flow[JsonObject].scan(emptyCounters)(updateCounters)

最后,将所有内容组合在一起:

val counterSource : Source[Counters, NotUsed] = jsonSrc via parserFlow via counterFlow

结果正是您所要求的:只有在所有计数器都已更新时才转发计数器值的背压流。

于 2016-07-20T11:40:15.723 回答