3

我正在努力弄清楚如何汇集资源,我开始怀疑我的线程可能是问题(不是 100%,但一直在试验它)。我要做的事情的要点是创建一个到服务器的通道池,然后查看线程是否正在使用它们。我已经成功地获得了为我上传的尽可能多的项目创建的频道数量(即它没有汇集,只是在每个线程中创建新频道)并且成功地只创建了一个频道(即没有汇集或创建新的频道)频道根据需要)。

我在想也许线程与池交互的方式是问题,所以我尝试创建newCachedThreadPool这样线程就不会死,只要有工作,但是当我这样做时,我收到错误说正在使用的通道已关闭。我的池中有一个destroyObject方法,但我从不调用它,所以我不明白它为什么被触发(如果我将它注释掉,那么它可以工作,但只创建一个通道,上传速度非常慢,大约 300 次操作/秒,而没有线程池我得到 30k/秒)。我怀疑它的终止,有什么方法可以验证这一点,如果它终止,我可以使用替代方法吗?

这是代码(忽略所有rabbitmq的东西,它只是为了让我可以监控结果):

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

    private static ExecutorService executor_worker;

    static {
        final int numberOfThreads_ThreadPoolExecutor = 20;
        executor_worker = Executors.newCachedThreadPool();
        executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
                                           new LinkedBlockingDeque<Runnable>());
    }

    private static ObjectPool<Channel> pool;

    public static void main(String[] args) throws Exception {
        System.out.println("starting..");           
        ObjectPool<Channel> pool =
                new GenericObjectPool<Channel>(
                new ConnectionPoolableObjectFactory(), 50);
        for (int x = 0; x<500000000; x++) {
            executor_worker.submit(new MyRunnable(x, pool));
        }
        //executor_worker.shutdown();
        //pool.close();
    }
}

 class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
     Channel channel;
     Connection connection;

    public ConnectionPoolableObjectFactory() throws IOException {
        System.out.println("hello world");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        channel = connection.createChannel(); 
    }

    @Override
    public Channel makeObject() throws Exception {  
        //channel = connection.createChannel(); 
        return channel; 
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

class MyRunnable implements Runnable{  
    protected int x = 0;
    protected ObjectPool<Channel> pool;

    public MyRunnable(int x, ObjectPool<Channel> pool) {
        // TODO Auto-generated constructor stub
        this.x = x;
        this.pool = pool;
    }

    public void run(){
        try {
                Channel channel = pool.borrowObject();
                String message = Integer.toString(x);
                channel.basicPublish( "", "task_queue", 
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes());
                pool.returnObject(channel);
        } catch (NoSuchElementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

ps 我基本上问了几个问题并阅读了文档并试图弄清楚这一点,在此过程中我可能完全走错了方向,所以如果您看到任何问题或提示,请按我的方式发送。

情节变厚:

在 main 方法的 for 循环中(我将工作提交给线程),我添加了:

    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
    System.out.println(threadSet.size()); //number of threads
    System.out.println(pool.getNumActive());

它向我显示了 25 个线程(尽管我说的是 20 个)和池中的 20 个项目。但是当我查看 rabbitmq UI 时,我看到一个连接只有一个通道。如果我创建频道并提交给可运行的,那么它会创建许多频道(但它永远不会关闭它们)。我不明白发生了什么以及为什么结果不符合预期。

4

1 回答 1

1

我认为问题在于您的 ConnectionPoolableObjectFactory 只创建了一个 Channel 对象。makeObject似乎每次调用它都应该创建一个新的 Channel 。

所以也许它应该像这样实现:

public class ConnectionPoolableObjectFactory
        extends BasePoolableObjectFactory<Channel> {

    private final Connection connection;

    private ConnectionPoolableObjectFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
    }

    @Override
    public Channel makeObject() throws Exception {
        return connection.createChannel();
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

这假设每个工厂从单个连接创建多个通道。

于 2012-05-02T18:09:54.230 回答