8

我想知道这是否是最好的方法。我有大约 500 个无限期运行的线程,但是 Thread.sleep 在完成一个处理周期后会持续一分钟。

   ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
   for (int i = 0; i < list.size(); i++) {
      es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
   }

正在执行的代码非常简单,基本上就是这样

class aThread extends Thread {
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         //Lots of computation every minute
      }
   }
}

对于每个正在运行的任务,我确实需要一个单独的线程,因此更改架构不是一种选择。我尝试使我的 threadPool 大小等于 Runtime.getRuntime().availableProcessors() ,它试图运行所有 500 个线程,但只让它们中的 8 个(4xhyperthreading)执行。其他线程不会投降,让其他线程轮到他们。我尝试输入一个wait()和notify(),但仍然没有运气。如果有人有一个简单的例子或一些提示,我将不胜感激!

好吧,这个设计可以说是有缺陷的。线程实现遗传编程或 GP,一种学习算法。每个线程分析高级趋势做出预测。如果线程完成,则学习将丢失。也就是说,我希望 sleep() 可以让我在一个线程不“学习”的情况下共享一些资源

所以实际的要求是

如何安排保持状态并每 2 分钟运行一次的任务,但控制一次执行的数量。

4

11 回答 11

13

如果您的线程没有终止,这是线程内代码的错误,而不是线程池的错误。如需更详细的帮助,您需要发布正在执行的代码。

另外,为什么在完成后让每个线程进入睡眠状态;让它完成不是更好吗?

此外,我认为您通过使线程数等于您希望执行的任务数来滥用线程池。线程池的重点是对使用的资源数量进行限制;这种方法并不比完全不使用线程池好。

最后,您不需要将 的实例传递Thread给您的ExecutorService,只需将Runnable. ExecutorService维护自己的无限循环线程池,将工作从内部队列中拉出(工作是Runnable您提交的 s)。

于 2010-05-19T18:28:35.393 回答
10

为什么不使用 aScheduledExecutorService安排每个任务每分钟运行一次,而不是让所有这些线程空闲整整一分钟?

ScheduledExecutorService workers = 
  Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) { 
  workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}

你是什​​么意思,“改变架构不是一种选择”?如果你的意思是你根本不能修改你的任务(具体来说,任务必须循环,而不是运行一次,并且调用Thread.sleep()不能被删除),那么“良好的性能不是一个选项”,要么.

于 2010-05-19T18:59:46.610 回答
3

我不确定您的代码在如何使用线程池方面在语义上是正确的。ExecutionService 在内部创建和管理线程,客户端应该只提供一个 Runnable 实例,其 run() 方法将在池线程之一的上下文中执行。您可以查看我的示例。另请注意,每个正在运行的线程为堆栈占用大约 10Mb 的系统内存,并且在 linux 上,java 到本机线程的映射是 1 对 1。

于 2010-05-19T18:37:53.277 回答
2

您应该让它返回并使用ThreadPoolexecutor执行每分钟发布到您的工作队列的工作,而不是让它进入睡眠状态。

于 2010-05-19T18:39:30.120 回答
2

回答你的问题,什么类型的线程池?

我发表了我的评论,但这确实应该解决您的问题。您有一个可能需要 2 秒才能完成的计算。您有许多任务 (500) 想要尽快完成。假设没有 IO 和/或网络流量,您可以实现的最快吞吐量是Runtime.getRuntime().availableProcessors()线程数。

如果您将线程数增加到 500 个,那么每个任务都将在自己的线程上执行,但操作系统会每隔一段时间安排一个线程以提供给另一个线程。那是在任何给定点的 125 个上下文切换。每个上下文切换都会增加每个任务运行的时间。

这里的大局是,当您超过处理器数量时,添加更多线程并不等于更大的吞吐量。

编辑:快速更新。你不需要在这里睡觉。当您使用 8 个处理器执行 500 个任务时,每个任务将在 2 秒内完成,然后它正在运行的线程将执行下一个任务并完成该任务。

于 2010-05-19T18:52:30.200 回答
1

8 线程是您的系统可以处理的最大值,而且您正在通过上下文切换减慢自己的速度。

看看这篇文章http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4它会给你一个如何做的概述。

于 2010-05-19T18:50:37.543 回答
1

这应该做你想要的,但不是你要求的:-)你必须拿出Thread.sleep()

ScheduledRunnable.java

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledRunnable
{
    public static void main(final String[] args)
    {
        final int numTasks = 10;
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < numTasks; i++)
        {
            ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
        }
    }

    private static class MyRunnable implements Runnable
    {
        private int id;
        private int numRuns;

        private MyRunnable(final int id)
        {
            this.id = id;
            this.numRuns = 0;
        }

        @Override
        public void run()
        {
            this.numRuns += 1;
            System.out.format("%d - %d\n", this.id, this.numRuns);
        }
    }
}

这会安排Runnables每 10 秒显示该行为。如果您确实需要在处理完成等待一段固定的时间,您可能需要使用.scheduleXXX您需要的方法。我认为无论执行时间是多少,fixedWait 都会每隔 N 次运行一次。

于 2010-05-19T20:34:27.703 回答
0

对于每个正在运行的任务,我确实需要一个单独的线程,因此更改架构不是一种选择。

如果是这样(例如,调用外部阻塞函数),则为它们创建单独的线程并启动它们。您不能创建具有有限数量线程的线程池,因为其中一个线程中的阻塞函数将阻止将任何其他可运行对象放入其中,并且创建每个任务一个线程的线程池并不会获得太多收益。

我尝试使我的 threadPool 大小等于 Runtime.getRuntime().availableProcessors() ,它试图运行所有 500 个线程,但只让它们中的 8 个(4xhyperthreading)执行。

当您将创建的 Thread 对象传递给线程池时,它只会看到它们实现了Runnable. 因此,它将运行每个Runnable完成。任何停止方法返回的循环run()都不允许下一个入队任务运行;例如:

public static void main (String...args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; ++i) {
        final int task = i;

        executor.execute(new Runnable () {
        private long lastRunTime = 0;
            @Override
            public void run () {

                for (int iteration = 0; iteration < 4; )
                {
                    if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
                    {
                        // do your work here
                        ++iteration;
                        System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());

                        this.lastRunTime = System.currentTimeMillis();
                    }
                    else
                    {
                        Thread.yield(); // otherwise, let other threads run
                    }
                }
            }
        });
    }

    executor.shutdown();
}

打印出来:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...

显示第一个(线程池大小)任务在安排下一个任务之前运行完成。

您需要做的是创建运行一段时间的任务,然后让其他任务运行。您如何构建这些取决于您想要实现的目标

  • 是否希望所有任务同时运行,都等待一分钟,然后再次同时运行,或者任务是否彼此不同步
  • 您是否真的希望每个任务以一分钟的间隔运行
  • 您的任务是否可能阻塞,因此确实需要单独的线程
  • 如果任务阻塞的时间长于预期的运行窗口,预期会有什么行为
  • 如果任务阻塞时间超过重复率(阻塞超过一分钟),预期会有什么行为

根据对这些问题的回答,可以使用 ScheduledExecutorService、信号量或互斥锁的某种组合来协调任务。最简单的情况是非阻塞、非同步任务,在这种情况下,直接使用 ScheduledExecutorService 每分钟运行一次您的可运行文件。

于 2010-05-19T19:41:42.127 回答
0

你可以重写你的项目以使用一些基于代理的并发框架,比如Akka吗?

于 2010-05-19T20:02:13.183 回答
-1

你需要一个信号量。

class AThread extends Thread {
   Semaphore sem;
   AThread(Semaphore sem) {
     this.sem = sem;
   }
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         sem.acquire();
         try {
           //Lots of computation every minute
         } finally {
           sem.release();
         }
      }
   }
}

实例化 AThreads 时,您需要传递相同的信号量实例:

Semaphore sem = new Semaphore(MAX_AVAILABLE, true);

编辑:谁投反对票可以请解释为什么?我的解决方案有问题吗?

于 2010-05-19T18:45:19.397 回答
-1

您当然可以通过将线程数减少到系统可以实际处理的数量来提高吞吐量。你愿意改变线程的设计吗?它会减轻调度程序的负担,将睡眠中的线程放入队列中,而不是实际拥有数百个睡眠线程。

class RepeatingWorker implements Runnable {

private ExecutorService executor;
private Date lastRan;

//constructor takes your executor

@Override
public void run() {

  try {
    if (now > lastRan + ONE_MINUTE) {
      //do job
      lastRan = now;
    } else {
      return;
  } finally {
    executor.submit(this);
  }
}
}

这保留了“作业无限期重复,但在执行之间至少等待一分钟”的核心语义,但现在您可以将线程池​​调整为机器可以处理的内容,而那些不工作的则在队列中,而不是闲逛在调度程序中作为睡眠线程。如果没有人真正做任何事情,则会出现一些等待忙碌的行为,但我从您的帖子中假设应用程序的全部目的是运行这些线程,并且它目前正在运行您的处理器。如果必须为其他事情腾出空间,您可能需要调整它:)

于 2010-05-19T19:16:32.133 回答