我的应用程序有一个生产者和一个消费者。我的制作人不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者听队列,当有消息时,接受它并处理这个消息。这个过程可能需要几个小时,如果我的消费者没有完成对当前消息的处理,我不希望我的消费者从队列中获取另一条消息。
我认为 AKKA 和 AWS SQS 可以满足我的需求。通过阅读文档和示例,akka-camel 似乎可以简化我的工作。
我在 github 上找到了这个例子。
我对消费者的配置更感兴趣(Thread.sleep 只是为了模拟我的处理):
class MySqsConsumer extends Consumer {
//The SQS URI is an in-only message exchange (autoAck=true)
override def endpointUri = "aws-sqs://sqs-akka-camel? amazonSQSClient=#client"
override def receive = {
case msg: CamelMessage => {
println("Start received %s" format msg.bodyAs[String])
Thread.sleep(4000)
println("Stop received %s" format msg.bodyAs[String])
}
}
}
堆栈跟踪 :
...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...
我的问题是 akka-camel 在接收方法完成之前从队列中读取消息。
如何让我的消费者在接收新消息之前等待流程结束?如果我使用了错误的工具/库,你能指导我使用新工具吗?