5

我在 Scala 中使用Akka Streams使用AWS Java SDK从AWS SQS队列进行轮询。我创建了一个ActorPublisher,它以两秒的间隔将消息出列:

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
  implicit val materializer = ActorMaterializer()

  val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")

  val client = new AmazonSQSClient()
  client.setRegion(RegionUtils.getRegion("us-east-1"))
  val url = client.getQueueUrl(name).getQueueUrl

  val MaxBufferSize = 100
  var buf = Vector.empty[Message]

  override def receive: Receive = {
    case "dequeue" =>
      val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
      messages.foreach(self ! _)
    case message: Message if buf.size == MaxBufferSize =>
      log.error("The buffer is full")
    case message: Message =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

在我的应用程序中,我也尝试以 2 秒的间隔运行流程:

val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
  .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
  .to(Sink.ignore)

system.scheduler.schedule(0 seconds, 2 seconds) {
  flow.runWith(sqsSource)(ActorMaterializer()(system))
}

但是,当我运行我的应用程序时,我会收到java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]ActorMaterializer.

是否有推荐的方法来持续实现 Akka 流?

4

1 回答 1

8

我认为您不需要ActorPublisher每 2 秒创建一个新的。这似乎是多余的和浪费的内存。另外,我不认为 ActorPublisher 是必要的。根据我对代码的了解,您的实现将拥有越来越多的 Streams,它们都在查询相同的数据。来自客户端的每个都Message将由 N 个不同的 akka Streams 处理,更糟糕的是,N 会随着时间的推移而增长。

无限循环查询的迭代器

您可以使用 scala 从 ActorPublisher 获得相同的行为Iterator。可以创建一个不断查询客户端的迭代器:

//setup the client
val client = {
  val sqsClient = new AmazonSQSClient()
  sqsClient setRegion (RegionUtils getRegion "us-east-1")
  sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
  client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] = 
  Iterator continually messageListStream

//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

此实现仅在所有先前的 Messages 已被消耗时才查询客户端,因此是真正的响应式。无需跟踪固定大小的缓冲区。您的解决方案需要一个缓冲区,因为消息的创建(通过计时器)与消息的消耗(通过 println)分离。在我的实现中,创建和消费通过背压紧密耦合。

Akka 流源

然后,您可以使用此迭代器生成器函数来提供 akka 流源:

def messageSource : Source[Message, _] = Source fromIterator messageIterator

流动形成

最后,您可以使用此 Source 来执行println(附带说明:您的flow值实际上是 a Sinksince Flow + Sink = Sink)。使用flow问题中的值:

messageSource runWith flow

一个 akka Stream 处理所有消息。

于 2015-11-23T14:03:50.390 回答