0

我有处理引擎,它只能通过队列接口对我可用(公开)。

我想将基于 Que 的接口抽象为一个简单的阻塞接口,可以由多个线程同时调用。

单个 JVM 进程每秒将调用此接口大约 500 次。

高层次的问题是:如何将基于 Que 的接口转换为 Java 中的线程安全阻塞函数调用。

笔记:

  1. 我将请求放在 Request_Q
  2. 请求是我生成唯一 id 的表单的 JSON 对象,有效负载是我要处理的消息:

    {id:1234567890,有效载荷:“foo”}

  3. 引擎完成处理此请求后,它将响应放入队列 Response_Q

  4. 响应消息与请求对象的形式相同,只是payload对应于处理后的消息,id与请求对象相同。例如,对所述请求对象的响应可能如下所示:

    {id:1234567890,有效载荷:“bar”}

  5. 我弹出 Response_Q 的消息

我希望将这个基于队列的处理系统封装在以下形式的传统阻塞函数调用中:

public String process (String payload) {

   JSONObject request = new JSONObject();
   request.set("id", /* ... some way to generate a random key */ );
   request.set("payload", payload);

   // push the request onto Request_Q

   // Question
   // What is an elegant way to organize the code from here down?
}
4

3 回答 3

1

您可以为此使用 Apache Camel,一些常见的企业集成适用于这种类型的用例。

一般来说,你需要做的任何事情总是适合一个或多个 EIP,而 Camel 就像脚手架一样,可以帮助你对那些定义明确且解决良好的问题进行编码。

最适合这里的可能是请求/回复

于 2013-01-05T01:16:55.217 回答
1

这是一种方法:

  • 创建一个从 response_q 读取的后台线程
  • 创建一个Map<String, Queue>响应线程可以访问的全局变量

在阻塞函数中

  1. 向映射添加新条目,请求 id 作为键,新队列作为值,它是这个请求的私有队列
  2. 写入 request_q
  3. 在 private_q 中等待响应
  4. 可用时,删除映射条目并返回从队列中读取的响应

在后台线程中,在无限循环中

  1. 从全局 response_q 读取响应
  2. 从响应中提取 id
  3. 从全局映射中识别私有队列
  4. 写入私有队列

需要一些同步。探索更好的私有队列数据结构。

更新

msandiford评论所述,使用无锁ConcurrentHashMapBlockingQueue.

于 2013-01-04T23:56:59.233 回答
1

虽然我认为您最好弄清楚如何使您的代码成为非阻塞而不是阻塞队列,但这是一种方法:

在您的 process() 方法中,设置一个查看队列的循环,并且仅在找到对该方法调用中发送的消息的响应时才退出。这将非常低效,但我认为没有一种有效的方法可以做到这一点。

于 2013-01-04T23:39:22.717 回答