3

我正在研究一个与 Java 相关的实际场景;一个套接字程序。现有系统和预期系统如下。

现有系统- 系统检查是否满足特定条件。如果是这样,它将创建一些要发送的消息并将其放入队列中。

队列处理器是一个单独的线程。它定期检查队列中是否存在项目。如果找到任何项目(消息),它只会将消息发送到远程主机(硬编码)并从队列中删除该项目。

预期系统- 这是类似的东西。当满足某个条件但在每种情况下收件人都不相同时,就会创建消息。所以有很多方法。

  1. 将消息放入同一个队列,但带有其接收者 ID。在这种情况下,第二个线程可以识别接收者,因此可以将消息发送到该接收者。

  2. 有多个线程。在这种情况下,当条件满足并且接收者处于“新”状态时,它会创建一个新队列并将消息放入该队列。并且一个新线程初始化以处理该队列。如果下一条消息被定向到同一个收件人,它应该放入同一个队列,如果不是一个新队列,则应该创建线程。

现在我想实现第二个,有点卡住了。我该怎么做?骨架就足够了,您无需担心如何创建队列等... :)

更新:我也认为方法 1 是最好的方法。我阅读了一些关于线程的文章并做出了这个决定。但也值得学习如何实现方法 2。

4

5 回答 5

4

考虑使用Java 消息服务 (JMS)而不是重新发明轮子?

于 2009-04-26T12:02:38.670 回答
2

我可以建议你看看BlockingQueue吗?您的调度进程可以写入此队列(放置),客户端可以以线程安全的方式获取或查看。所以你根本不需要编写队列实现。

如果您有一个包含不同消息类型的队列,那么您将需要为每个客户端实现一些 peek 类型的机制(即他们必须检查队列的头部并且只取他们的)。为了有效地工作,消费者必须及时、可靠地提取他们所需的数据。

如果每个消息/消费者类型有一个队列/线程,那么这将更容易/更可靠。

您的客户端实现只需循环:

while (!done) {
   Object item = queue.take();
   // process item
}

请注意,队列可以使用泛型,并且take()是阻塞的。

当然,由于多个消费者接收不同类型的消息,您可能需要考虑基于空间的架构。这不会有队列(FIFO)特性,但会以一种非常简单的方式允许您使用多个消费者。

于 2009-04-26T12:02:46.220 回答
2

您必须稍微权衡一下是否有很多终端机器和偶尔向每个机器发送消息,或者有一些终端机器和频繁消息发送给每个机器。

如果您有很多终端机器,那么实际上每台终端机器有一个线程听起来有点过头了,除非您真的要不断地将消息流式传输到所有这些机器。我建议拥有一个只会在特定范围内增长的线程池。为此,您可以使用 ThreadPoolExecutor。当您需要发布消息时,您实际上是向将发送消息的执行程序提交了一个可运行文件:

Executor msgExec = new ThreadPoolExecutor(...);

public void sendMessage(final String machineId, byte[] message) {
  msgExec.execute(new Runnable() {
    public void run() {
      sendMessageNow(machineId, message);
    }
  });
}

private void sendMessageNow(String machineId, byte[] message) {
  // open connection to machine and send message, thinking
  // about the case of two simultaneous messages to a machine,
  // and whether you want to cache connections.
}

如果您只有几台终端机器,那么每台机器可以有一个 BlockingQueue,每个阻塞队列都有一个线程等待下一条消息。在这种情况下,模式更像是这样(注意未经测试的周日早上的头顶代码):

ConcurrentHashMap<String,BockingQueue> queuePerMachine;

public void sendMessage(String machineId, byte[] message) {
  BockingQueue<Message> q = queuePerMachine.get(machineId);
  if (q == null) {
    q = new BockingQueue<Message>();
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
    if (prev != null) {
      q = prev;
    } else {
      (new QueueProessor(q)).start();
    }
  }
  q.put(new Message(message));
}

private class QueueProessor extends Thread {
  private final BockingQueue<Message> q;
  QueueProessor(BockingQueue<Message> q) {
    this.q = q;
  }
  public void run() {
    Socket s = null;
    for (;;) {
      boolean needTimeOut = (s != null);
      Message m = needTimeOut ?
         q.poll(60000, TimeUnit.MILLISECOND) :
         q.take();
      if (m == null) {
        if (s != null)
          // close s and null
      } else {
        if (s == null) {
          // open s
        }
        // send message down s
      }
    }
    // add appropriate error handling and finally
  }
}

在这种情况下,如果该机器的消息在 60 秒内没有到达,我们将关闭连接。

您应该改用 JMS 吗?好吧,你必须权衡这听起来对你来说是否复杂。我个人的感觉是,这不是一项足够复杂的任务,不足以保证一个特殊的框架。但我敢肯定意见不同。

PS 实际上,现在我看看这个,你可能会把队列放在线程对象中,然后映射机器 ID -> 线程对象。无论如何,你明白了。

于 2009-04-26T14:13:03.753 回答
2

您可以尝试使用SomnifugiJMS,这是一个使用 java.util.concurrent 作为实际“引擎”的虚拟机 JMS 实现。

就您的目的而言,这可能有点矫枉过正,但很可能使您的应用程序无需额外编程(如果适用)即可分发,您只需插入一个不同的 JMS 实现,如ActiveMQ,就完成了。

于 2009-04-26T14:27:39.960 回答
1

首先,如果您计划有很多接收器,我不会使用 ONE-THREAD-AND-QUEUE-PER-RECEIVER 方法。你最终可能会导致很多线程大部分时间都没有做任何事情,我可能会损害你的性能。另一种方法是使用工作线程的线程池,只需从共享队列中挑选任务,每个任务都有自己的接收者 ID,也许还有一个共享字典,其中包含到每个接收者的套接字连接以供工作线程使用。

话虽如此,如果你仍然想继续你的方法,你可以做的是:

1)创建一个新类来处理你的新线程执行:

public class Worker implements Runnable {
   private Queue<String> myQueue = new Queue<String>();
   public void run()
   {
       while (true) {
          string messageToProcess = null;
          synchronized (myQueue) {
             if (!myQueue.empty()) {
                 // get your data from queue
                 messageToProcess = myQueue.pop();
             }
          }
          if (messageToProcess != null) {
             // do your stuff
          }
          Thread.sleep(500); // to avoid spinning
       }
   }
   public void queueMessage(String message)
   {
      synchronized(myQueue) {
         myQueue.add(message);
      }
   }
}

2) 在您的主线程上,创建消息并使用字典(哈希表)查看接收者的线程是否已经创建。如果是,则将新消息排队。如果没有,则创建一个新线程,将其放入哈希表并将新消息排队:

while (true) {
   String msg = getNewCreatedMessage(); // you get your messages from here
   int id = getNewCreatedMessageId();   // you get your rec's id from here
   Worker w = myHash(id);
   if (w == null) {   // create new Worker thread
      w = new Worker();
      new Thread(w).start();
   }
   w.queueMessage(msg);
}

祝你好运。

编辑:您可以使用此方法中提到的BlockingQueue Brian 来改进此解决方案。

于 2009-04-26T12:14:54.430 回答