2

我目前正在开发一个使用分配异步处理的系统。信息的传输是使用队列完成的。因此,一个进程会将信息放入队列(并终止),另一个进程将获取并处理它。我的实现让我面临许多挑战,我对每个人解决这些问题的方法很感兴趣(在架构和库方面)。

让我画一幅画。假设您有三个过程:

Process A -----> Process B
                      |
Process C <-----------|

因此,进程 A将消息放入队列并结束,进程 B拾取消息,对其进行处理并将其放入“返回”队列。进程 C拾取消息并处理它。

  1. 如何处理进程 B不侦听或处理队列外的消息?是否有一些 JMS 类型的方法可以防止生产者在消费者不活动时提交消息?所以进程 A将提交但抛出异常。
  2. 假设进程 C必须在 X 分钟内得到答复,但进程 B已停止(出于任何原因),是否有某种机制强制队列超时?所以保证在 X 分钟内回复,这将启动Process C

可以使用某种死信队列来处理所有这些问题吗?我是否应该使用计时器手动完成所有操作并进行检查。我已经提到了 JMS,但我对任何事情都持开放态度,事实上我正在使用 Hazelcast 作为队列。

请注意,就可用的 Java 技术和方法而言,这更像是一个架构问题,我认为这是一个正确的问题。

任何建议将不胜感激。

谢谢

4

6 回答 6

2

在我看来,您提出的问题类型是“闻起来”队列和异步处理可能不是适合您情况的最佳工具。

1)这违背了队列的目的。听起来您需要一个同步的请求-响应过程。

2)进程C一般来说没有得到答复。它正在从队列中获取消息。如果队列中有消息并且进程 C 已准备好,那么它将得到它。例如,进程 C 可以在收到消息后确定该消息是陈旧的。

于 2012-02-03T06:53:56.960 回答
2

我认为您的第一个问题已经被其他海报充分回答了。

关于您的第二个问题,您尝试执行的操作可能取决于您的应用程序使用的消息传递引擎。我知道这适用于 IBM MQ。我已经看到这是使用WebSphere MQ Classes for Java而不是 JMS 来完成的。它的工作方式是当进程 A将消息放入队列时,它指定它将等待响应消息的时间。如果进程 A在指定时间内未能收到响应消息,系统将抛出相应的异常。

我认为 JMS 中没有一种标准方法可以按照您想要的方式处理请求/响应超时,因此您可能必须使用特定于平台的类,例如 WebSphere MQ Classes for Java。

于 2012-02-03T07:47:17.423 回答
2

恕我直言,最简单的解决方案是使用 ExecutorService,或基于 executor 服务的解决方案。这支持工作队列、计划任务(用于超时)。

它也可以在单个进程中工作。(我相信 Hazelcast 支持分布式 ExecutorService)

于 2012-02-03T11:10:15.643 回答
1

好吧,排队的目的是让事情保持隔离。

如果您不拘泥于任何特定的技术,您可以使用数据库作为您的队列。

但首先,确保两个进程协调的一种简单机制是使用套接字。如果可行,只需让进程 B 在某个众所周知的端口上创建一个打开的套接字侦听器,进程 A 将连接到该套接字并监视它。如果进程 B 消失了,进程 A 可以知道是因为他们的套接字关闭了,并且它可以将其用作进程 B 出现问题的警报。

对于 B -> C 问题,有一个 db 表:

create table queue (
    id integer,
    payload varchar(100), // or whatever you can use to indicate a payload
    status varchar(1),
    updated timestamp
)

然后,进程 A 将其条目放入队列,当前时间和状态为“B”。B、监听队列:

select * from queue where status = 'B' order by updated

当 B 完成时,它会更新队列以将状态设置为“C”。

同时,“C”正在轮询数据库:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 

(阈值有多长,您希望事情在队列中腐烂)。

最后,C 将队列行更新为“D”以表示完成,或者删除它,或者任何你喜欢的。

不利的一面是这里有一点竞争条件,C 可能会在 B 刚刚启动时尝试获取一个条目。您可能可以通过严格的隔离级别和一些锁定来解决这个问题。就像:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 
FOR UPDATE

也使用 FOR UPDATE 进行 B 的选择。这样,无论谁赢得选择比赛,都将获得该行的排他锁。

这将使您在实际功能方面走得很远。

于 2012-02-03T07:01:12.237 回答
1

您期望使用异步(消息)设置进行同步处理的语义,这是不可能的。我曾在 WebSphere MQ 上工作过,通常当消费者死亡时,消息会永远保留在队列中(除非您设置了过期时间)。一旦队列达到其深度,后续消息将被移动到死信队列。

于 2012-02-03T07:02:22.390 回答
1

我使用类似的方法为视频转码作业创建排队和处理系统。基本上它的工作方式是:

  1. Process A向 发布“计划”消息Arbiter Q,将作业添加到其“等待”队列中。
  2. Process B从 请求下一个作业Arbiter Q,这会删除其“等待”队列中的下一个项目(受一些自定义调度逻辑的影响,以确保单个用户无法泛滥转码请求并阻止其他用户能够转码视频)并插入它在将作业返回到Process B. 当作业进入“处理”集时,它会被加上时间戳。
  3. Process B完成作业并向 发布“完成”消息Arbiter Q,从“处理”集中删除作业,然后修改某些状态,以便Process C知道作业已完成。
  4. Arbiter Q定期检查其“处理”集中的作业,并将任何运行时间异常长的作业超时。 Process A然后可以自由地尝试再次排队相同的作业,如果它想要的话。

这是使用 JMX 实现的(JMS 会更合适,但我离题了)。 Process A只是响应用户发起的转码请求的 servlet 线程。 Arbiter Q是接收“计划”和“完成”消息的 MBean 单例(在服务器集群中的所有节点上持久/复制)。它内部管理的“队列”只是简单的List实例,当作业完成时,它会修改应用程序数据库中的一个值,以引用转码视频文件的 URL。 Process B是转码线程。它的工作只是请求一份工作,对其进行转码,然后在完成后报告。一遍又一遍,直到时间的尽头。 Process C是另一个用户/servlet 线程。它将看到 URL 可用,并向用户显示下载链接。

在这种情况下,如果Process B要死亡,那么这些作业将永远处于“等待”队列中。然而,在实践中,这从未发生过。如果你Process B没有运行/做它应该做的事情,那么我认为这表明你的部署/配置/实施中Process B存在问题,而不是整体方法中的问题。

于 2012-02-03T07:11:11.570 回答