您必须稍微权衡一下是否有很多终端机器和偶尔向每个机器发送消息,或者有一些终端机器和频繁消息发送给每个机器。
如果您有很多终端机器,那么实际上每台终端机器有一个线程听起来有点过头了,除非您真的要不断地将消息流式传输到所有这些机器。我建议拥有一个只会在特定范围内增长的线程池。为此,您可以使用 ThreadPoolExecutor。当您需要发布消息时,您实际上是向将发送消息的执行程序提交了一个可运行文件:
Executor msgExec = new ThreadPoolExecutor(...);
public void sendMessage(final String machineId, byte[] message) {
msgExec.execute(new Runnable() {
public void run() {
sendMessageNow(machineId, message);
}
});
}
private void sendMessageNow(String machineId, byte[] message) {
// open connection to machine and send message, thinking
// about the case of two simultaneous messages to a machine,
// and whether you want to cache connections.
}
如果您只有几台终端机器,那么每台机器可以有一个 BlockingQueue,每个阻塞队列都有一个线程等待下一条消息。在这种情况下,模式更像是这样(注意未经测试的周日早上的头顶代码):
ConcurrentHashMap<String,BockingQueue> queuePerMachine;
public void sendMessage(String machineId, byte[] message) {
BockingQueue<Message> q = queuePerMachine.get(machineId);
if (q == null) {
q = new BockingQueue<Message>();
BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
if (prev != null) {
q = prev;
} else {
(new QueueProessor(q)).start();
}
}
q.put(new Message(message));
}
private class QueueProessor extends Thread {
private final BockingQueue<Message> q;
QueueProessor(BockingQueue<Message> q) {
this.q = q;
}
public void run() {
Socket s = null;
for (;;) {
boolean needTimeOut = (s != null);
Message m = needTimeOut ?
q.poll(60000, TimeUnit.MILLISECOND) :
q.take();
if (m == null) {
if (s != null)
// close s and null
} else {
if (s == null) {
// open s
}
// send message down s
}
}
// add appropriate error handling and finally
}
}
在这种情况下,如果该机器的消息在 60 秒内没有到达,我们将关闭连接。
您应该改用 JMS 吗?好吧,你必须权衡这听起来对你来说是否复杂。我个人的感觉是,这不是一项足够复杂的任务,不足以保证一个特殊的框架。但我敢肯定意见不同。
PS 实际上,现在我看看这个,你可能会把队列放在线程对象中,然后映射机器 ID -> 线程对象。无论如何,你明白了。