0

抱歉,如果这是基本的,我只是看不到答案。基本上我正在使用 ThreadPoolExecutor 运行几个线程来启动 10 个线程。我希望每个线程在其生命周期内都与服务器建立自己的连接。有可能吗?如果可以,我应该把这段代码放在哪里?

示例代码:

class DoWork implements Runnable{

    protected String[] data = null; 

    public DoWork(String[] data) {
        // get the data each thread needs to work
        this.data = data;
    }

    public void run() {
        // Do the work here

    }

}

我的理解是,如果工作队列中有项目,那么ThreadPoolExecutor保持 10 个线程处于活动状态,当工作完成时,它们就会死亡。在这个结构中的某个地方我可以添加连接信息吗?我不想在该DoWork方法中执行此操作,因为这将为每个工作单元完成(因此为工作队列中的尽可能多的项目打开连接,可能是数千个,并且在我尝试时会导致超时)。介于类声明和方法之间似乎没有做任何事情(尽管我可能做错了)。

关于如何做到这一点的任何想法?

更新:不是 100% 需要,但我很好奇是否还有办法让它在终止时执行某些操作(也许这样我可以关闭与服务器的连接而不是等待它超时)。

4

4 回答 4

5

您可以使用 ThreadLocal 来存储每个线程的连接:

public class ConnectionProvider {
    private static final ThreadLocal<Connection> threadLocalConnection = 
         new ThreadLocal<Connection>() {
             @Override 
             protected Connection initialValue() {
                 // TODO create and return the connection
             }
         };

    public static Connection getConnection() {
        return threadLocalConnection.get();
    }
}

或者,如果多个线程(在不同时间)可以使用连接,则可以创建一个连接池。run 方法将从池中获取一个连接(它将动态创建它并将其添加到已创建的连接集合中,如果没有可用的连接),并在它完成使用它时将其释放到池中(在 finally 块中)。这将使连接可用于任何其他任务。这将具有使所需连接的数量可能低于线程池中的线程数量的优势。

于 2012-04-29T14:34:45.273 回答
1

创建自己的线程工厂,并将其作为参数传递给线程池构造函数。线程工厂创建的线程应覆盖方法 run(),如下所示:

public void run() {
  connectToServer();
  super.run();
}

问题是,谁以及如何使用这些连接?如果您的意思是提供给线程池的每个作业都应该使用由它运行的线程创建的连接,那么将该连接保存为 ThreadLocal。

于 2012-04-29T14:39:32.330 回答
1

改编自我在CLI Processes 的 ThreadPool 的其他回答

这将创建一个连接池,您可以从中提取。这将防止每个线程的连接数上升和下降。但是,这仅在哪个线程使用哪个连接无关紧要时才有效。如果它确实重要,您必须调整此代码或使用其他人建议的 ThreadLocal 并在 Thread 死亡时挂钩。

当一个新的工作项排队时,线程会向连接池请求一个连接。如果一个不可用,它将创建一个新的。如果有一个可用,它将验证连接是否仍然有效,然后返回该对象。当工作项完成后,它可以将其返回到连接池。

public class StackOverflow_10037379_jdk6 {

    private static Logger sLogger = Logger.getLogger(StackOverflow_10372827_jdk6.class.getName());           

    public static class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Connection> {

        public ConnectionPoolableObjectFactory() {

        }

        @Override
        public Connection makeObject() throws Exception {                
            Connection connection = // createConnection
            return connection;
        }

        @Override
        public boolean validateObject(Connection connection) {
            return connection.isValid();
        }

        @Override
        public void destroyObject(Connection connection) throws Exception {
            connection.close();
        }

        @Override
        public void passivateObject(Connection connection) throws Exception {

        }
    }

    public static class WorkItem implements Runnable {

        private ObjectPool<Connection> mPool;
        private String mWork;

        public CLIWorkItem(ObjectPool<Connection> pool, String work) {
            mPool = pool;
            mWork = work;
        }

        @Override
        public void run() {
            Connection connection = null;
            try {
                connection = mPool.borrowObject();
                // do stuff with connection
            } catch (Exception ex) {
                sLogger.log(Level.SEVERE, null, ex);
            } finally {
                if (connection != null) {
                    try {
                        // Seriously.. so many exceptions.
                        mPool.returnObject(connection );
                    } catch (Exception ex) {
                        sLogger.log(Level.SEVERE, null, ex);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // Change the 5 to 20 in your case. 
        ObjectPool<Connection> pool =
                new GenericObjectPool<Connection>(
                new ConnectionPoolableObjectFactory(), 5);

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);

        // print some stuff out.
        executor.execute(new WorkItem(pool, "Message 1\r\n"));
        executor.execute(new WorkItem(pool, "Message 2\r\n"));
        executor.execute(new WorkItem(pool, "Message 3\r\n"));
        executor.execute(new WorkItem(pool, "Message 4\r\n"));
        executor.execute(new WorkItem(pool, "Message 5\r\n"));
        executor.execute(new WorkItem(pool, "Message 6\r\n"));
        executor.execute(new WorkItem(pool, "Message 7\r\n"));
        executor.execute(new WorkItem(pool, "Message 8\r\n"));
        executor.execute(new WorkItem(pool, "Message 9\r\n"));
        executor.execute(new WorkItem(pool, "Message 10\r\n"));
        executor.execute(new WorkItem(pool, "Message 11\r\n"));

        executor.shutdown();
        executor.awaitTermination(4000, TimeUnit.HOURS);

        pool.close();
    }
}
于 2012-04-29T14:42:57.650 回答
1

使用连接池不是更容易吗?然后,每个任务将在启动时简单地从池中检索一个连接,并在完成时返回它。这样,您就不必创建比线程更多的连接,并且您可以在所有任务完成后轻松销毁连接池(及其所有连接)。

于 2012-04-29T14:43:43.803 回答