0

多年来,我正在开发我的第一个多线程应用程序。我遇到的问题是我需要同时执行两种方法。这是我的引擎类:

public class ThreadPoolEngine {

    // create BlockingQueue to put fund transfer objects
    private BlockingQueue<GlobalSearchRequest> searchQueue;

    public ThreadPoolExecutor executor;

    private HashMap<String, GlobalSearchProcessorCallable> callableMap;

    private ArrayList<Future<Integer>> futurList;

    Logger logger = Logger.getLogger(ThreadPoolEngine.class);

    private Integer gthreadCount;
    private Integer gjobPerThread;

    public ThreadPoolEngine(Integer threadCount, Integer jobPerThread) {
        gthreadCount = threadCount;
        gjobPerThread = jobPerThread;
        // create a thread pool with the entered no of threads
        executor = new HammerThreadPoolExecutor(threadCount, threadCount, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        searchQueue = new ArrayBlockingQueue<GlobalSearchRequest>(jobPerThread);

        callableMap = new HashMap<String, GlobalSearchProcessorCallable>();

        // create list to store reference to Future objects
        futurList = new ArrayList<Future<Integer>>();
    }

    public void createAndSubmitTasks() {
        // create Callables
        for (int i = 0; i < gthreadCount; i++) {

            GlobalSearchProcessorCallable callable1 = new GlobalSearchProcessorCallable(
                    "SearchProcessor_" + i, searchQueue);
            callableMap.put(callable1.getThreadName(), callable1);

            // submit callable tasks
            Future<Integer> future;
            future = executor.submit(callable1);
            futurList.add(future);
        }
    }

    public void populateSearchQueue() throws InterruptedException {
        // put orderVO objects in BlockingQueue
        KeywordFactory key = KeywordFactory.getInstance();

        for (int i = 0; i < gjobPerThread*gthreadCount; i++) {
            // this method will put SearchRequest object in the order queue
            try {
                searchQueue.put(new GlobalSearchRequest(key.getRandomPhrase(3)));
            } catch (KeywordNoDataFileException e) {
                e.printStackTrace();
            }
        }
    }

    public void printProcessorStatus() throws InterruptedException {
        // print processor status until all orders are processed
        while (!searchQueue.isEmpty()) {
            for (Map.Entry<String, GlobalSearchProcessorCallable> e : callableMap
                    .entrySet()) {
                logger.debug(e.getKey() + " processed order count: "
                        + e.getValue().getProcessedCount());
            }
            Thread.sleep(1000);
        }
    }

    public void shutDown(boolean forceShutdown) {
        if (!forceShutdown) {
            // shutdown() method will mark the thread pool shutdown to true
            executor.shutdown();
            logger.debug("Executor shutdown status " + executor.isShutdown());
            logger.debug("Executor terninated status "
                    + executor.isTerminated());

            // Mark threads to return threads gracefully.
            for (Map.Entry<String, GlobalSearchProcessorCallable> orderProcessor : callableMap
                    .entrySet()) {
                orderProcessor.getValue().setRunning(false);
            }
        } else {

            for (Future<Integer> f : futurList) {
                f.cancel(true);
            }

            // shutdown() method will mark the thread pool shutdown to true
            executor.shutdownNow();
        }
    }

    public void printWorkersResult() {
        for (Future<Integer> f : futurList) {
            try {
                Integer result = f.get(1000, TimeUnit.MILLISECONDS);
                logger.debug(f + " result. Processed orders " + result);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            } catch (ExecutionException e) {
                logger.error(e.getCause().getMessage(), e);
            } catch (TimeoutException e) {
                logger.error(e.getMessage(), e);
            } catch (CancellationException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}

好的,我有一个主类实例化这个类并调用这个类的两个方法,populateSearchQueue 和 createAndSubmitTasks 来运行我的工作类并处理搜索队列中的项目。

populateSearchQueue 方法的问题可能需要很长时间才能构建(我打算一次用十亿个查询来锤击系统)并且可能需要大量内存。java中有没有一种方法可以让我的主类同时调用populateSearchQueue和createAndSubmitTasks,这样工作线程就可以在队列仍然由populateSearchQueue方法构建的同时开始处理队列?

4

1 回答 1

1

我实际上解决了它。我再次阅读了我的代码,并意识到创建线程池需要很短的时间。因此,调用createAndSubmitTasks创建线程池并为每个线程池分配一个等待做某事的工作类。完成该方法后,我现在有 1000 个线程池坐在那里无所事事。然后在我调用的那一刻populateSearchQueue,那些在移动到下一个方法所需的几毫秒内处于空闲状态的工作线程现在开始从队列中抓取作业,我得到了我想要的结果。将东西放入队列的方法是在工作线程从该队列中抓取作业并运行它们的同时进行处理。

所以我颠倒了我调用方法的顺序。这是一件美丽的事情。

于 2012-12-14T21:28:00.277 回答