1

我不确定如何更好地将我的程序设计为多线程。基本上我有一个队列服务器,我连接到它并发送它工作,但是当我启动更多线程以更快地发送更多工作时,我注意到我的线程正在阻塞。

这是我正在做的一个例子(在我的程序中,我正在发送一些数据和一个从共享连接派生的通道)。

class Send_to_Queue implements Runnable{

    protected String queue_name = null;
    protected Channel channel = null;
    protected byte[] message = 0

   public Send_to_Queue(String queue_name, byte[] message, Channel channel) {
        // TODO Auto-generated constructor stub
        this.queue_name = queue_name;
        this.message   = message;
        this.channel = channel;         
    } 

该通道并不是线程所独有的,它由所有正在启动的线程共享,我认为这是阻塞的来源。我对执行此操作的最佳方法有点困惑,因为我不确定如何在 ThreadPoolExecutor其生命周期内创建一个新频道,而不是为每个任务创建一个新频道(有点贵)。如果没有工作,我不介意关闭它,但如果我有 4 个线程和 100 个工作单元,我希望它只建立一个新通道 4 次而不是 100 个。

我知道为我的服务器创建新通道/连接的语法似乎不明白如何以一种并非在每个实例上都建立的方式来做到这一点。将连接传递给线程并让它启动一个新通道(this.channel每次都在创建一个新通道)

4

1 回答 1

1

我认为您想在构建Channel时创建一个新的Thread,以便它可以使用此通道并并行完成工作。有几种不同的方法可以做到这一点。通常在服务器场景中,服务器接受一个连接,该连接会Channel为该连接生成一个新连接。然后它把它Channel交给处理程序Thread。套接字也是如此,但你明白了:

ServerSocket socket = ...
...
while (true) {
    Socket clientSocket = socket.accept();
    new Thread(new MyRunnable(clientSocket)).start();
}
...
public class MyRunnable implements Runnable {
    private Socket clientSocket;
    public MyRunnable(Socket clientSocket) {
        this.clientSocket = clientSocket;
    }
    public void run() {
        while (!done) {
            // use the socket associated with this thread
        }
    }
}

如果您只是创建一堆线程来建立与远程服务器的连接并完成工作,您可以在循环中或使用其中一个ExecutorService池来执行此操作:

ExecutorService threadPool = Executors.newFixedThreadPool(100); 
for (int i = 0; i < 100; i++) {
    threadPool.submit(new Runnable() {
        public void run() {
            Channel threadChannel = // create channel here;
            while (!done) {
                // use the per-thread channel
            }
        }
    });
}

另一种流行的模式是让线程使用SocketChannel处理一系列单元。你可以使用 aBlockingQueue来达到这个目的:

BlockingQueue<WorkUnit> workQueue = new LinkedBlockingQueue<WorkUnit>();
...
// add work units to the work queue
for (int i = 0; i < 1000; i++) {
    // add work to the queue
    workQueue.put(new WorkUnit(i));
}

// the MyRunnable above can then be modified like this:
    ...
    public void run() {
        while (!done) {
            WorkUnit workUnit = workQueue.take();
            // use MyRunnable socket or channel and do the WorkUnit
        }
    }
于 2012-04-28T16:04:06.160 回答