6

我正在进行的一个项目需要从 SQS 读取消息,因此我决定使用 Akka 来分发这些消息的处理。

由于 Camel 支持 SQS,并且在 Consumer 类中内置了用于 Akka 的功能,我认为最好以这种方式实现端点并读取消息,尽管我没有看到很多人这样做的例子。

我的问题是我无法足够快地轮询队列以保持队列为空或接近空。我最初的想法是,我可以让消费者以 X/s 的速率通过 Camel 从 SQS 接收消息。从那里,我可以简单地创建更多的消费者来达到我需要处理消息的速度。

我的消费者:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}

如图所示,我已经设置delay=1&maxMessagesPerPoll=10提高了消息的速率,但我无法使用相同的端点生成多个消费者。

我在文档中读到,By default endpoints are assumed not to support multiple consumers.我相信这也适用于 SQS 端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟后,输出消息Count for actor: x而不是输出的其他消息Count for actor: 0

如果这有用的话;我可以在单个消费者上使用当前实现读取大约 33 条消息/秒。

这是从 Akka 的 SQS 队列中读取消息的正确方法吗?如果是这样,有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近 900 条消息/秒?

4

2 回答 2

5

遗憾的是,Camel 目前不支持在 SQS 上并行消费消息。

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

为了解决这个问题,我编写了自己的 Actor 来使用 aws-java-sdk 轮询批处理消息 SQS。

  def receive = {
    case BeginPolling => {
      // re-queue sending asynchronously
      self ! BeginPolling
      // traverse the response
      val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
      val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
      messages.toList.foreach {
        node => {
          deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
          //log.info("Node body: {}", node.getBody)
          filterSupervisor ! node.getBody
        }
      }
      if(deleteEntryList.size() > 0){
        val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
        sqs.deleteMessageBatch(deleteMessageBatchRequest)
      }
    }

    case _ => {
      log.warning("Unknown message")
    }
  }

虽然我不确定这是否是最好的实现,当然可以对其进行改进,以使请求不会经常遇到空队列,但它确实适合我目前能够从同一个队列轮询消息的需求。

用这个从 SQS 获得大约 133(消息/秒)/演员。

于 2013-11-06T16:20:20.243 回答
1

Camel 2.15 支持 concurrentConsumers,但不确定这有多大用处,因为我不知道 akka camel 是否支持 2.15,而且我不知道即使有多个消费者,有一个消费者参与者是否会有所作为。

于 2015-06-18T10:44:36.333 回答