我正在从事一个内存和计算密集型项目。执行的很大一部分使用了多线程FixedThreadPool
。简而言之; 我有1 个BlockingQueue
线程用于从多个远程位置(使用 URL 连接)获取数据并使用要分析的对象填充 a以及选择这些对象并运行分析的n 个线程。编辑:见下面的代码
现在这个设置在我运行 OpenSUSE 11.3 的 Linux 机器上运行起来就像一个魅力,但一位同事正在运行 Win7 的非常相似的机器上测试它,正在收到队列轮询超时的自定义通知(见下面的代码),实际上有很多。我一直在尝试监视她机器上的处理器使用情况,并且似乎软件没有获得超过 15% 的 CPU,而在我的机器上,处理器使用率达到了顶峰,正如我所期望的那样。
那么,我的问题是,这可能是队列“饥饿”的迹象吗?会不会是生产者线程没有获得足够的 cpu 时间?如果是这样,我该如何给池中的一个特定线程更高的优先级?
更新: 我一直在试图查明问题,但并不高兴……但我确实获得了一些新的见解。
使用 JVisualVM 分析代码的执行表现出一种非常特殊的行为。这些方法在 CPU 时间的短时间内被调用,其间有几秒钟没有进展。对我来说,这意味着操作系统以某种方式在进程中踩刹车。
禁用防病毒和备份守护程序对此事没有任何重大影响
通过任务管理器(此处建议)更改 java.exe(唯一实例)的优先级也不会改变任何内容。(话虽这么说,我不能给java“实时”优先级,而不得不满足于“高”优先级)
分析网络使用情况显示了良好的数据流入和流出,所以我猜这不是瓶颈(虽然它是流程执行时间的相当大一部分,但我已经知道并且几乎与我在我的 Linux 机器上得到了什么)。
关于 Win7 操作系统如何限制我的项目的 cpu 时间的任何想法?如果不是操作系统,那么限制因素可能是什么?我想再次强调,这台机器没有同时运行任何其他密集型计算,除了我的软件之外,cpus 上几乎没有负载。这真让我抓狂...
编辑:相关代码
public ConcurrencyService(Dataset d, QueryService qserv, Set<MyObject> s){
timeout = 3;
this.qs = qserv;
this.bq = qs.getQueue();
this.ds = d;
this.analyzedObjects = s;
this.drc = DebugRoutineContainer.getInstance();
this.started = false;
int nbrOfProcs = Runtime.getRuntime().availableProcessors();
poolSize = nbrOfProcs;
pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize);
drc.setScoreLogStream(new PrintStream(qs.getScoreLogFile()));
}
public void serve() throws InterruptedException {
try {
this.ds.initDataset();
this.started = true;
pool.execute(new QueryingAction(qs));
for(;;){
MyObject p = bq.poll(timeout, TimeUnit.MINUTES);
if(p != null){
if (p.getId().equals("0"))
break;
pool.submit(new AnalysisAction(ds, p, analyzedObjects, qs.getKnownAssocs()));
}else
drc.log("Timed out while waiting for an object...");
}
} catch (Exception ex) {
ex.printStackTrace();
String exit_msg = "Unexpected error in core analysis, terminating execution!";
}finally{
drc.log("--DEBUG: Termination criteria found, shutdown initiated..");
drc.getMemoryInfo(true); // dump meminfo to log
pool.shutdown();
int mins = 2;
int nCores = poolSize;
long totalTasks = pool.getTaskCount(),
compTasks = pool.getCompletedTaskCount(),
tasksRemaining = totalTasks - compTasks,
timeout = mins * tasksRemaining / nCores;
drc.log("--DEBUG: Shutdown commenced, thread pool will terminate once all objects are processed, " +
"or will timeout in : " + timeout + " minutes... \n" + compTasks + " of " + (totalTasks -1) +
" objects have been analyzed so far, " + "mean process time is: " +
drc.getMeanProcTimeAsString() + " milliseconds.");
pool.awaitTermination(timeout, TimeUnit.MINUTES);
}
}
该类QueryingAction
是一个简单的类Runnable
,它调用指定QueryService
对象中的数据采集方法,然后填充一个BlockingQueue
. 该类AnalysisAction
对MyObject
.