3

我有一个分布式系统,其节点通过套接字接收消息对象。当在另一个线程中接收和处理消息时,这些消息将写入 BlockingQueue。我确保一台机器中只有一个 BlockingQueue 实例。的传入速率非常高,大约每秒数千。消费者一开始运作良好,但在一段时间后阻塞(根本没有响应) - 我检查了 BlockingQueue 不为空,因此不应被 BlockingQueue.take() 阻塞。当我手动降低传入消息对象的速率时,消费者工作得非常好。这很令人困惑……

你能帮我找出问题吗?提前非常感谢。

消费者代码:

ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat(id+"-machine-worker")
            .setDaemon(false)
            .setPriority(Thread.MAX_PRIORITY)
            .build();
ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
executor.submit(new Worker(machine));

public static class Worker implements Runnable {
    Machine machine;
    public Worker(Machine machine) {
        this.machine = machine;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Message message = machine.queue.take();
                // Do my staff here...
            } catch (Exception e) {
                logger.error(e);
            }
        }
    }
}

生产者代码:

// Below code submits the SocketListener runnable described below
ExecutorService worker;
Runnable runnable = socketHandlerFactory.getSocketHandlingRunnable(socket, queue);
worker.submit(runnable);

public SocketListener(Socket mySocket, Machine machine, LinkedBlockingQueue<Message> queue) {
    this.id = machine.id;
    this.socket = mySocket;
    this.machine = machine;
    this.queue = queue;

    try {
        BufferedInputStream bis = new BufferedInputStream(socket.getInputStream(), 8192*64);
        ois = new ObjectInputStream(bis);
    } catch (Exception e) {
        logger.error("Error in create SocketListener", e);
    }
}

@Override
public void run() {
    Message message;
    try {
        boolean socketConnectionIsAlive = true;
        while (socketConnectionIsAlive) {
            if (ois != null) {
                message = (Message) ois.readObject();
                queue.put(message);
            }
        }
    } catch (Exception e) {
        logger.warn(e);
    }
}
4

2 回答 2

4

如果您使用的是无界队列,则可能会发生整个系统由于内存压力而陷入困境的情况。此外,这意味着生产强度不受消费强度的限制。所以,使用有界队列。

另一个建议:当您的阻塞条件发生时,获取一个完整的线程堆栈跟踪转储,以确定消费者阻塞的位置。你可能会在那里得到惊喜。

于 2013-04-30T11:38:57.133 回答
0

您有几个候选问题领域:

  1. 您使用的是什么实际的 BlockingQueue?您是否达到了 ArrayBlockingQueue 的上限?

  2. 你为你的进程分配了多少内存?即,这个进程的最大堆是多少?如果由于传入消息的过载而达到堆空间的上限,则完全有可能出现 OutOfMemoryError。

  3. 在您的消息处理过程中实际发生了什么(“我的员工在这里......” [原文如此])?您是否可能在该代码中存在死锁,只有在您每秒发送许多消息时才会公开该死锁。您是否在调用堆栈的某个地方有一个异常吞噬者,它隐藏了您遇到的真正问题?

  4. 您的记录器在哪里记录?您是否因为没有登录到您期望的位置而丢弃指示性消息?

于 2013-04-30T11:53:43.450 回答