我有一个分布式系统,其节点通过套接字接收消息对象。当在另一个线程中接收和处理消息时,这些消息将写入 BlockingQueue。我确保一台机器中只有一个 BlockingQueue 实例。的传入速率非常高,大约每秒数千。消费者一开始运作良好,但在一段时间后阻塞(根本没有响应) - 我检查了 BlockingQueue 不为空,因此不应被 BlockingQueue.take() 阻塞。当我手动降低传入消息对象的速率时,消费者工作得非常好。这很令人困惑……
你能帮我找出问题吗?提前非常感谢。
消费者代码:
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(id+"-machine-worker")
.setDaemon(false)
.setPriority(Thread.MAX_PRIORITY)
.build();
ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
executor.submit(new Worker(machine));
public static class Worker implements Runnable {
Machine machine;
public Worker(Machine machine) {
this.machine = machine;
}
@Override
public void run() {
while (true) {
try {
Message message = machine.queue.take();
// Do my staff here...
} catch (Exception e) {
logger.error(e);
}
}
}
}
生产者代码:
// Below code submits the SocketListener runnable described below
ExecutorService worker;
Runnable runnable = socketHandlerFactory.getSocketHandlingRunnable(socket, queue);
worker.submit(runnable);
public SocketListener(Socket mySocket, Machine machine, LinkedBlockingQueue<Message> queue) {
this.id = machine.id;
this.socket = mySocket;
this.machine = machine;
this.queue = queue;
try {
BufferedInputStream bis = new BufferedInputStream(socket.getInputStream(), 8192*64);
ois = new ObjectInputStream(bis);
} catch (Exception e) {
logger.error("Error in create SocketListener", e);
}
}
@Override
public void run() {
Message message;
try {
boolean socketConnectionIsAlive = true;
while (socketConnectionIsAlive) {
if (ois != null) {
message = (Message) ois.readObject();
queue.put(message);
}
}
} catch (Exception e) {
logger.warn(e);
}
}