1

我正在尝试设置一个将每 x 分钟/秒/毫秒/任何时间运行一次的作业,并轮询 Amazon SQS 队列以处理要处理的消息。我的问题是最好的方法是什么。我是否应该创建一个具有 x 个线程的 ScheduledThreadPoolExecutor 并使用 scheduleAtFixedRate 方法安排一个任务并经常运行它(如 10 毫秒),以便在需要时使用多个线程,或者,正如我向同事提议的那样,创建一个ScheduledThreadPoolExecutor 具有 x 个线程,然后以稍微偏移的间隔创建多个计划任务,但运行频率较低。在我看来,这听起来像是 STPE 的用途。

通常,我将 Spring/Quartz 用于这种类型的东西,但此时已经过时了。

所以你的想法是什么?

4

3 回答 3

1

我建议您在 SQS 上使用长轮询,这使您的ReceiveMessage调用行为更像take对 a 的调用BlockingQueue(这意味着您不需要使用计划任务从队列中轮询 - 您只需要一个轮询的线程无限循环,如果连接超时重试)

于 2013-07-23T16:58:13.783 回答
0

长轮询就像一个阻塞队列,只有20 seconds在调用返回的最大值之后。如果这是轮询周期之间所需的最大延迟,那么长轮询就足够了。除此之外,您将需要一个预定的执行器。

线程数实际上取决于您处理接收到的消息的速度。如果您可以非常快速地处理消息,则只需要一个线程。我有如下设置

  1. SingleThreadScheduledExecutorwithscheduleWithFixedDelay在上一次完成后 5 分钟执行
  2. 在每次执行中,从 SQS 批量检索消息,直到没有更多消息要处理(记住每个批次最多接收 10 条消息)。
  3. 消息被处理,然后从队列中删除。

对于我的场景,单线程就足够了。如果积压在增加(例如,每条可能涉及等待的消息都需要网络操作),您可能希望使用多个线程。如果一个处理节点变得资源受限,您总是可以启动另一个实例(可能是 EC2)来增加更多容量。

于 2013-08-15T01:09:30.243 回答
0

好吧,这取决于任务的频率。如果您只需要按时间间隔轮询并且间隔不是很小,那么ScheduledThreadPoolExecutorwithscheduleAtFixedRate是一个不错的选择。

否则我会推荐使用 netty 的HashedWheelTimer. 在繁重的任务下,它提供了最佳性能。Akka 和 play 使用它来调度。这是因为STPE对于每个任务添加都需要O(log(n))where as HWTtakes O(1)

如果你必须使用STPE,我会推荐一个任务,否则会导致资源过剩。

于 2013-07-23T17:03:10.497 回答