我正在进行的一个项目需要从 SQS 读取消息,因此我决定使用 Akka 来分发这些消息的处理。
由于 Camel 支持 SQS,并且在 Consumer 类中内置了用于 Akka 的功能,我认为最好以这种方式实现端点并读取消息,尽管我没有看到很多人这样做的例子。
我的问题是我无法足够快地轮询队列以保持队列为空或接近空。我最初的想法是,我可以让消费者以 X/s 的速率通过 Camel 从 SQS 接收消息。从那里,我可以简单地创建更多的消费者来达到我需要处理消息的速度。
我的消费者:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
如图所示,我已经设置delay=1
并&maxMessagesPerPoll=10
提高了消息的速率,但我无法使用相同的端点生成多个消费者。
我在文档中读到,By default endpoints are assumed not to support multiple consumers.
我相信这也适用于 SQS 端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟后,输出消息Count for actor: x
而不是输出的其他消息Count for actor: 0
。
如果这有用的话;我可以在单个消费者上使用当前实现读取大约 33 条消息/秒。
这是从 Akka 的 SQS 队列中读取消息的正确方法吗?如果是这样,有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近 900 条消息/秒?