0

我对如何将对象(成功)返回到池中有些困惑。我正在测试为rabbitmq 创建一个频道池(尽管这与rabbitmq 没有任何关系)。rabbitmq 中的过程是创建与服务器的连接,然后创建一个通道,我试图让它创建通道并使用池的一部分,但它只是不断创建新通道,并且似乎没有重用旧通道。我相信是这样,因为当我检查兔子的 Web UI 时,它说我的频道与队列中的项目一样多,但我的上传速度约为每秒 10k 条消息,所以我希望只有该范围内的频道。

我很确定这正在发生,因为我不知道如何成功返回池。我正在使用 returnObject 但我是否需要做任何事情以使其准备好供另一个进程使用?

这是代码(它有很多代码,但我认为问题出在MyPooledObject classpool.returnObject(obj)部分:

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {
    public static class MyPooledObject {
        Connection connection;
        public MyPooledObject() throws IOException {
            System.out.println("hello world");
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();

        }

        public Channel sing() throws IOException {
            //System.out.println("mary had a little lamb");
            return connection.createChannel();
        }

        public void destroy() {
            System.out.println("goodbye cruel world");

        }
    }

    public static class MyPoolableObjectFactory extends BasePoolableObjectFactory<MyPooledObject> {
        @Override
        public MyPooledObject makeObject() throws Exception {
            return new MyPooledObject();
        }

        @Override
        public void destroyObject(MyPooledObject obj) throws Exception {
            obj.destroy();
        }
        // PoolableObjectFactory has other methods you can override
        // to valdiate, activate, and passivate objects.
    }

    public static void main(String[] args) throws Exception {
        PoolableObjectFactory<MyPooledObject> factory = new MyPoolableObjectFactory();
        ObjectPool<MyPooledObject> pool = new GenericObjectPool<MyPooledObject>(factory);

        // Other ObjectPool implementations with special behaviors are available;
        // see the JavaDoc for details

        try {
            for (int i = 0; i < 500000000; i++) {
                MyPooledObject obj;

                try {
                    obj = pool.borrowObject();
                } catch (Exception e) {
                    // failed to borrow object; you get to decide how to handle this
                    throw e;
                }

                try {
                    // use the pooled object
                    Channel channel = obj.sing();
                    String message = "Mary Had a little lamb";
                    channel.basicPublish( "", "task_queue", 
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes());

                } catch (Exception e) {
                    // this object has failed us -- never use it again!
                    pool.invalidateObject(obj);
                    obj = null; // don't return it to the pool

                    // now handle the exception however you want

                } finally {
                    if (obj != null) {
                        pool.returnObject(obj);
                    }
                }
            }
        } finally {
            pool.close();
        }
    }
}
4

1 回答 1

1

行为符合预期。在 makeObject 上设置一个断点,你会发现你只使用过一次,因为你一次只使用池中的一个对象。

如果您要同时使用池中的多个对象,则池将填满并使用不同的对象。

于 2012-05-01T16:25:08.623 回答