这个程序 (optoesim) 正在java.util.ConcurrentModificationException
为stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
.
这是一个开源程序,我没有改变任何东西。有人可以向我解释这意味着什么,是什么原因造成的,以及如何避免它。
Exception in thread ConcurrentModificationException
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)**
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:744)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:742)
at java.util.HashMap.putMapEntries(HashMap.java:511)
at java.util.LinkedHashMap.<init>(LinkedHashMap.java:384)
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:164)
at org.edg.data.replication.optorsim.GridDataThread.run(GridDataThread.java:95)
public Statistics getStatistics() {
Map stats = new HashMap();
// After remove see the result here.
OptorSimParameters params = OptorSimParameters.getInstance();
float _usage = _time.getTimeMillis() - _startRunning == 0 ? 0 : 100 *_workingTime/(_time.getTimeMillis() - _startRunning);
stats.put("usage", new Float(_usage));
stats.put("remoteReads", new Long(_remoteReads));
stats.put("localReads", new Long(_localReads));
if( params.outputStatistics() ==3) {
stats.put("jobTimes", new LinkedHashMap( _jobTimes));
stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
stats.put("jobFiles", new LinkedHashMap(_jobFiles));
stats.put("numberOfJobs", new Integer(_jobsCompleted));
stats.put("workerNodes", new Integer(_workerNodes));
stats.put("status", new Boolean(_active));
stats.put("queueLength", new Integer(_inputJobHandler.getQueueSize()));
stats.put("runnableStatus", new Boolean(_runnable));
}
stats.put("totalJobTime", new Float(_totalJobTime/(float)1000));
long meanJobTime = 0;
if (_jobsCompleted!=0)
meanJobTime = _workingTime/_jobsCompleted;
/////////////////////////////////////////
stats.put("meanJobTime", new Long(meanJobTime));
return new Statistics(this, stats);
}
**Edit:**
我会向你展示错误的所有细节
Exception in thread "Thread-72" java.util.ConcurrentModificationException
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:744)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:742)
at java.util.HashMap.putMapEntries(HashMap.java:511)
at java.util.LinkedHashMap.<init>(LinkedHashMap.java:384)
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:174)
at org.edg.data.replication.optorsim.SiteDataThread.run(SiteDataThread.java:112)
当我点击第一个linkedhashMap
final LinkedHashMap.Entry<K,V> nextNode() {
LinkedHashMap.Entry<K,V> e = next;
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
if (e == null)
throw new NoSuchElementException();
current = e;
next = e.after;
return e;
}
第二个链接的hasMap
final class LinkedEntryIterator extends LinkedHashIterator
implements Iterator<Map.Entry<K,V>> {
public final Map.Entry<K,V> next() { return nextNode(); }
}
并且错误在第 3 行第 3 行 linkedhashMap
final class LinkedEntryIterator extends LinkedHashIterator
implements Iterator<Map.Entry<K,V>> {
public final Map.Entry<K,V> next() { return nextNode(); }
}
错误在第一行 哈希图中
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
K key = e.getKey();
V value = e.getValue();
putVal(hash(key), key, value, false, evict);
链接哈希图
public LinkedHashMap(Map<? extends K, ? extends V> m) {
super();
accessOrder = false;
putMapEntries(m, false);
}
错误在行
putMapEntries(m, false);
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:174)
这是simpleComputing Elelement的所有代码
package org.edg.data.replication.optorsim;
/ * ComputingElement 运行一个线程,该线程执行通过它的{@link JobHandler} 给它的GridJobs *。对于所需的每个文件,* ComputingElement 调用getBestFile(),它根据 * 选择的优化算法返回 * 文件的最佳副本的位置,该算法可能已执行或未执行 * 复制。ComputingElement 从此位置* 读取文件并对其进行处理。处理文件的时间计算为 * 参数文件中指定的时间除以 ComputingElement 中的工作者 * 节点数。*
* 每个 ComputingElement 当前一次只能运行一个作业。* 如果在参数文件中选择了统计级别 3 *,则可以在模拟结束时的 * 统计输出中找到每个作业所用时间的信息,如果使用 GUI,则可以从作业时间直方图 * 中找到。*
* 版权所有 (c) 2002 CERN, ITC-irst, PPARC,代表 EU DataGrid。* 有关许可条件,请参阅许可文件或 *http://www.edg.org/license.html *
* @since JDK1.4 /*
public class SimpleComputingElement implements ComputingElement {
private static int _LastCEId = 0;
private GridSite _site;
private String _ceName;
private boolean _imAlive;
private boolean _paused = false;
private int _CEId;
private long _workingTime = 0;
private long _startRunning;
private long _totalJobTime = 0;
private Map _jobTimes = new LinkedHashMap();
private Map _jobTimesWithQueue = new LinkedHashMap();
private Map _jobFiles = new LinkedHashMap();
private int _jobsCompleted = 0;
protected JobHandler _inputJobHandler;
protected boolean _runnable = false;
protected boolean _active=false;
protected long _remoteReads = 0;
protected long _localReads = 0;
protected int _workerNodes = 0;
protected float _workerCapacity = 0;
protected GridTime _time;
public SimpleComputingElement( GridSite site, int workerNodes, float capacity) {
OptorSimParameters params = OptorSimParameters.getInstance();
_time = GridTimeFactory.getGridTime();
_site = site;
_workerNodes = workerNodes;
_workerCapacity = capacity;
_CEId = ++_LastCEId;
_ceName = "CE"+_CEId+"@"+_site;
_inputJobHandler = new JobHandler( params.getMaxQueueSize());
_imAlive = true;
_site.registerCE( this);
_startRunning = _time.getTimeMillis();
}
/**
* Return a more meaningful name.
* @return the CE's name
*/
public String toString() {
return _ceName;
}
/**
* Check whether this CE is active (processing jobs) or idle.
*/
public boolean active() {
return _active;
}
/**
* Check whether this CE is still running or has been shut down.
*/
public boolean imAlive() {
return _imAlive;
}
/**
* A method to return the input sandbox for this computing element.
*/
public JobHandler getJobHandler() {
return _inputJobHandler;
}
/**
* Method to get the site that this CE is on.
* @return The site this CE is on.
*/
public GridSite getSite() {
return _site;
}
/**
* Method to give the name of this CE.
* @return The name of this CE.
*/
public String getCeName() {
return _ceName;
}
public int getWorkerNodes() {
return _workerNodes;
}
/**
* Method to check against our ID
*/
public boolean iAm( int id) {
return _CEId == id;
}
/**
* Method to collate and return information relevant
* to this CE as a {@link Statistics} object.
* @return The statistics of this CE
*/
public Statistics getStatistics() {
Map stats = new HashMap();
OptorSimParameters params = OptorSimParameters.getInstance();
float _usage = _time.getTimeMillis() - _startRunning == 0 ? 0 : 100 *_workingTime/(_time.getTimeMillis() - _startRunning);
stats.put("usage", new Float(_usage));
stats.put("remoteReads", new Long(_remoteReads));
stats.put("localReads", new Long(_localReads));
if( params.outputStatistics() ==3) {
/* LinkedHashSet<String> lhs = new LinkedHashSet<String>();
stats.put("jobTimes", new LinkedHashMap( _jobTimes));
stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
stats.put("jobFiles", new LinkedHashMap(_jobFiles));
stats.put("numberOfJobs", new Integer(_jobsCompleted));
stats.put("workerNodes", new Integer(_workerNodes));
stats.put("status", new Boolean(_active));
stats.put("queueLength", new Integer(_inputJobHandler.getQueueSize()));
stats.put("runnableStatus", new Boolean(_runnable));
}
stats.put("totalJobTime", new Float(_totalJobTime/(float)1000));
long meanJobTime = 0;
if (_jobsCompleted!=0)
meanJobTime = _workingTime/_jobsCompleted;
/////////////////////////////////////////
stats.put("meanJobTime", new Long(meanJobTime));
return new Statistics(this, stats);
}
/**
* When running, the ComputingElement processes all the jobs
* submitted to it through the JobHandler, sleeping while the
* JobHandler is empty. It is notified to shut down by the
* ResourceBroker.
*/
public void run() {
// Boost our priority
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
Double execTime;
OptorSimParameters params = OptorSimParameters.getInstance();
_runnable = true;
// to keep thread running
for( GridJob job=null; job != null || _imAlive; ) {
_active=false;
job=_inputJobHandler.get(); // This potentially blocks
// We might get a null job from JobHandler, if so, skip any further activity
if( job == null)
continue;
job.started();
OptorSimOut.println(_ceName+"> starting to process "+job+" (queue length now "+
_inputJobHandler.getQueueSize()+")");
_active=true;
// Install our optimiser
Optimisable replicaOptimiser = OptimiserFactory.getOptimisable( _site);
AccessPatternGenerator accessPatternGenerator
= AccessPatternGeneratorFactory.getAPGenerator(job);
String[] logicalfilenames = new String[1];
List filesAccessed = new LinkedList();
for( String lfn = accessPatternGenerator.getNextFile();
lfn != null;
lfn = accessPatternGenerator.getNextFile()) {
filesAccessed.add(lfn);
// Pack the logical file name into the expected structure:
logicalfilenames[0] = lfn;
float[] fileFractions = new float[1];
fileFractions[0] = (float)1.0;
// Use optimiser to locate best replica of this file
DataFile[] files = replicaOptimiser.getBestFile(logicalfilenames,
fileFractions);
if( files.length != 1) {
System.out.println( "ASSERT FAILED: CE, getBestFile return array with wrong number of entries: "+ files.length +" != 1");
continue; // skip to next file
}
if(files[0] == null) {
System.out.println( _ceName + "> ERROR getBestFile returned"+
" null for "+logicalfilenames[0]);
continue; // skip to next file
}
StorageElement fileSE = files[0].se();
GridSite fileSite = fileSE.getGridSite();
// Special case. If file is remote, then simulate the remoteIO, unPin and move on to next file.
if( _site != fileSite) {
simulateRemoteIO( files[0], fileFractions[0]);
// log this as an access on the close SE (if it exists!)
if(_site.hasSEs())
_site.getCloseSE().accessFile(files[0]);
if(_workerNodes != 0) {
execTime = new Double((job.getLatency() + job.getLinearFactor()*files[0].size())/(_workerNodes*_workerCapacity));
_time.gtSleep(execTime.longValue());
}
files[0].releasePin();
_remoteReads++;
continue;
}
else {
fileSE.accessFile(files[0]);
_localReads++;
}
// process the file
if(_workerNodes != 0) {
execTime = new Double((job.getLatency() + job.getLinearFactor()*files[0].size())/(_workerNodes*_workerCapacity));
// System.out.println(this.toString()+"> processing file...");
_time.gtSleep(execTime.longValue());
}
files[0].releasePin();
//A while loop the ce enters when paused by gui
while(_paused){
_time.gtWait(this);
}
} // for each datafile in job
// statistics logging
long duration = _time.getTimeMillis() - job.timeStarted();
long durationWithQueue = _time.getTimeMillis() - job.timeScheduled();
if( duration < 0) {
OptorSimOut.println("BUG> Duration < 0!!");
}
_totalJobTime += durationWithQueue;
_workingTime += duration;
_jobsCompleted++;
if( params.outputStatistics() == 3 || params.useGui()) {
_jobTimes.put(job.toString(), new Long(duration));
_jobTimesWithQueue.put(job.toString(), new Long(durationWithQueue));
_jobFiles.put( job.toString(), filesAccessed);
}
} // while there are jobs left to run
_runnable = false;
} // run
/**
* A routine used by the CE to simulate remote IO. The GridContainer's copy() method is
* used to block the equivalent amount of time.
*/
protected void simulateRemoteIO( DataFile remoteFile, float fraction)
{
GridContainer gc = GridContainer.getInstance();
gc.copy( remoteFile, _site, fraction);
}
/**
* GUI calls this method to pause the ComputingElement
* threads when pause button is pressed.
*/
public void pauseCE() {
_paused = true;
}
/**
* GUI calls this method to unpause the ComputingElement
* threads when continue button is pressed.
*/
public void unpauseCE() {
_paused = false;
_time.gtNotify(this);
}
/**
* The ResourceBroker calls this method when it has
* distributed all the jobs to shut down the ComputingElement
* threads.
*/
public void shutDownCE(){
_imAlive = false;
}
}
at org.edg.data.replication.optorsim.SiteDataThread.run(SiteDataThread.java:112)
错误在行
st = ce.getStatistics();
{
//get the statistics object for this comp. element
ce = site.getCE();
st = ce.getStatistics();
//sample mean job time
Object r1 = st.getStatistic("meanJobTime");
String stat1 = r1.toString();
int stat1Int = Integer.parseInt(stat1);
seriesSMJTVTime.add(timeSecs, stat1Int);
//sample job times
Object r2 = st.getStatistic("jobTimes");
Map m = (Map)r2;
int pairs = m.size();
//if (number of previous key-value pairs != pairs)
// instantiate new histarray and fill with job time values
if (prevNoOfPairs!=pairs)
{
histarray = new double[pairs];
int i=0;
prevNoOfPairs++;
Set keySet = m.keySet();
Iterator iter = keySet.iterator();
while (iter.hasNext())
{
Object key = iter.next();
Object value = m.get(key);
String duration = value.toString();
float jobTime = Float.parseFloat(duration);
histarray[i] = jobTime;
i++;
}
}
//sample usage
Object r3 = st.getStatistic("usage");
String stat3 = r3.toString();
float coUsage = Float.parseFloat(stat3);
/* if (range values identical for last three readings)
* remove intermediate statistic
*/
if (coUsage==prevCoUsage&&coUsage==prevPrevCoUsage)
{
int itemCount = seriesSSEUVTime.getItemCount();
if (itemCount>2)
seriesSSEUVTime.remove(itemCount-1);
}
prevPrevCoUsage = prevCoUsage;
prevCoUsage = coUsage;
seriesSCEUVTime.add(timeSecs, coUsage);
}
}