0

我正在编写一个 Runnable 类,它将消息打包在一起一段时间或直到达到给定的大小,然后再通过网络发送它们。它旨在允许其他线程在通过某些设置方法运行时更改某些内部参数(例如数据包大小)。使用内部对象锁使部分run()逻辑与设置器互斥是否正确?

像这样的东西:

public class Packer implements Runnable {
    private BlockingQueue<byte[]> msgQueue;
    private Object lock = new Object();
    private Packet packet;
    private boolean running = false;

    public synchronized void append(byte[] payload) throws InterruptedException {
        msgQueue.put(payload);
    }

    public synchronized void setPacketCapacity(int size) {
        synchronized (lock) {
            // check to see if we need to flush the current packet first, etc.
            packet.setCapacity(size);
        }
    }
    public void run() {
        running = true;
        while (running) {
            try {
                byte[] msg = msgQueue.take();
                synchronized (lock) {
                    packet.add(msg);
                    // check if we need to flush the packet, etc.
                }
            } catch (InterruptedException ex) {
                logger.warn("interrupted");
                running = false;
            } catch (Exception e) {
                logger.error(e);
            }
        }
        logger.warn("stop");
    }
}

相关地,另一个线程告诉这个runnable停止(和刷新)的正确方法是什么?

由于该run()方法可能正在等待内部队列msgQueue,因此简单地设置可能还不够running=false,我可能不得不中断线程。或者,我可以向内部队列发送一个特殊的“流结束”消息,但如果队列已满,我可能需要等待一段时间才能被接受。

4

1 回答 1

0
  1. 由于您使用一个锁定对象锁定您的setter和内部的逻辑run(),因此它是正确的。我建议您synchronized从 setter 方法签名中删除,因为您已经使用锁定对象锁定了其中的所有代码
  2. 您可以像这样删除boolean running并写下您的run()

    public void run() {
      while (true) {
        try {
            byte[] msg = msgQueue.take();
            synchronized (lock) {
                packet.add(msg);
                // check if we need to flush the packet, etc.
            }
        } catch (InterruptedException ex) {
            logger.warn("interrupted");
            Thread.currentThread.interrupt();
            return;
        } catch (Exception e) {
            logger.error(e);
        }
      }
      logger.warn("stop");
    }
    

调用thread.interrupt()将强制run()方法中的代码转到设置中断标志的 InterruptedException 捕获块并从中返回run()

于 2018-11-29T08:31:57.213 回答