0

我正在使用带有 Java 7 的 Spring 4.3.8.RELEASE。我想创建一个线程工厂来帮助管理我的应用程序中的某些工作人员。我这样声明我的线程工厂

<bean id="myprojectThreadFactory" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="prefix-"/>
</bean>
<bean id="myprojectTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="threadFactory" ref="myprojectThreadFactory"/>
    <property name="corePoolSize" value="${myproject.core.thread.pool.size}" />
    <property name="maxPoolSize" value="${myproject.max.thread.pool.size}" />
</bean>

但是,我在线程上“加入”时遇到了麻烦。也就是说,我想在继续执行某个任务之前等待所有工作完成,所以我有

    m_importEventsWorker.work();
    m_threadExecutor.shutdown();
    System.out.println("done.");

我的线程池是这样执行的

public void work(final MyWorkUnit pmyprojectOrg)
{
    final List<MyWorkUnit> allOrgs = new ArrayList<MyWorkUnit>();
    if (pmyprojectOrg != null)
    {
        processData(pmyprojectOrg.getmyprojectOrgId());
    } else { 
        allOrgs.addAll(m_myprojectSvc.findAllWithNonEmptyTokens());
        // Cue up threads to execute
        for (final MyWorkUnit myprojectOrg : allOrgs)
        {
            m_threadExecutor.execute(new Thread(new Runnable(){
                @Override
                public void run()
                {
                    System.out.println("started.");
                    processData(myprojectOrg.getmyprojectOrgId());
                }
            }));
        }   // for

然而打印出来的是

done.
started.
started.

很明显,我没有在等。等待我的线程完成工作的正确方法是什么?

4

3 回答 3

1

您可以使用 ExecutorService 创建一个固定的线程池,并检查池大小是否为空:

ExecutorService executor = Executors.newFixedThreadPool(50);

如果您使用此执行程序运行任务并使用@Scheduled fixedRate 或 fixedDelay 定期检查线程池大小,您可以查看它们是否已完成。

ThreadPoolExecutor poolInfo = (ThreadPoolExecutor) executor;
Integer activeTaskCount = poolInfo.getActiveCount();

if(activeTaskCount = 0) {
    //If it is 0, it means threads are waiting for tasks, they have no assigned tasks.
    //Do whatever you want here!
}
于 2017-06-15T09:15:38.813 回答
0

ACountDownLatch用给定的计数初始化。此计数因调用该countDown()方法而递减。等待此计数达到零的线程可以调用其中一种await()方法。调用await()会阻塞线程,直到计数达到零。

您可以使用CountDownLatch主线程等待完成所有任务。您可以在主线程调用方法中将CountDownLatch大小声明为任务数以等待和每个任务完成调用CountDownLatch latch = new CountDownLatch(3);await()countDown()

public void work(final MyWorkUnit pmyprojectOrg)
{
    final List<MyWorkUnit> allOrgs = new ArrayList<MyWorkUnit>();
    if (pmyprojectOrg != null)
    {
        processData(pmyprojectOrg.getmyprojectOrgId());
    } else { 
        allOrgs.addAll(m_myprojectSvc.findAllWithNonEmptyTokens());

        CountDownLatch latch = new CountDownLatch(allOrgs.size());

        // Cue up threads to execute
        for (final MyWorkUnit myprojectOrg : allOrgs)
        {
            m_threadExecutor.execute(new Thread(new Runnable(){
                @Override
                public void run()
                {
                    System.out.println("started.");
                    processData(myprojectOrg.getmyprojectOrgId());
                   latch.countDown();
                }
            }));
        }  
       //After for loop
       latch.await();      

例子:

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

公共类 Decrementer 实现 Runnable {

CountDownLatch latch = null;

public Decrementer(CountDownLatch latch) {
    this.latch = latch;
}



 public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
于 2017-06-12T16:23:22.813 回答
0

由于我使用的是 Spring 的 ThreadPoolTask​​Executor,因此我发现下面的内容适合我的需求......

protected void waitForThreadPool(final ThreadPoolTaskExecutor threadPoolExecutor)
{
    threadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true);
    threadPoolExecutor.shutdown();    
    try {
        threadPoolExecutor.getThreadPoolExecutor().awaitTermination(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
}   // waitForThreadPool
于 2017-06-14T16:05:33.847 回答