0

我的应用程序有一个生产者和一个消费者。我的制作人不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者听队列,当有消息时,接受它并处理这个消息。这个过程可能需要几个小时,如果我的消费者没有完成对当前消息的处理,我不希望我的消费者从队列中获取另一条消息。

我认为 AKKA 和 AWS SQS 可以满足我的需求。通过阅读文档和示例,akka-camel 似乎可以简化我的工作。

我在 github 上找到了这个例子。

我对消费者的配置更感兴趣(Thread.sleep 只是为了模拟我的处理):

class MySqsConsumer extends Consumer {

  //The SQS URI is an in-only message exchange (autoAck=true)
  override def endpointUri = "aws-sqs://sqs-akka-camel?  amazonSQSClient=#client"

  override def receive = {
    case msg: CamelMessage => {
      println("Start received %s" format msg.bodyAs[String])
      Thread.sleep(4000)
      println("Stop received %s" format msg.bodyAs[String])

    }

  }
 }

堆栈跟踪 :

...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...

我的问题是 akka-camel 在接收方法完成之前从队列中读取消息。

如何让我的消费者在接收新消息之前等待流程结束?如果我使用了错误的工具/库,你能指导我使用新工具吗?

4

2 回答 2

1

不确定您是否使用 akka 流,但如果是,请将源发送到 Sink.queue

val graph=yourSource.to(Sink.queue)

当物化 ( graph.run) 时,它将返回一个 SinkQueue,您可以从中手动提取项目。它将使用背压机制,因此您不必担心不拉取物品会发生什么。

注意:我故意忽略了泛型!但不要忘记他们。

于 2016-06-17T20:39:12.987 回答
0

据我所知,你做不到。Camel 一直在消费队列中的消息。

您可以设置-在您确认当前override def autoAck = false消息之前,它不会向 发送另一条消息。receive()但是“隐藏”的骆驼演员仍然会消耗。

也许你可以提供一个定制的amazonSQSClient并以某种方式控制它。

于 2016-11-04T23:03:45.843 回答