我正在尝试编写多线程java程序来并行获取mongo数据并存储它。下面是 CallBack 的代码,它使用 70 个线程线程池创建工作线程。我正在使用 Callable 来回调 CallBack。
问题是获取的项目多于返回到回调列表。不知道怎么回事。任何人都可以帮忙吗?即使是“FETCHED....”打印的数字也比“INDEXED ...”要大。线程是否相互交叉?
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.solr.client.solrj.SolrServerException;
import org.xml.sax.SAXException;
import com.chegg.migrator.question.entity.TbsProblem;
public class CallBack {
List<TbsProblem> problemsToBeIndex = new ArrayList<TbsProblem>();
final int NO_OF_THREAD = 70;
public void returnResult(List<TbsProblem> result) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
problemsToBeIndex.addAll(result);
System.out.println(" Data Indexed "+problemsToBeIndex.size());
}
public List<TbsProblem> andAction() throws IOException, SAXException, ParserConfigurationException, SolrServerException {
ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(NO_OF_THREAD);
int ctr=0;
while(ctr <= 100000) {
CallingBackWorker worker = new CallingBackWorker();
worker.setCallBack(this);
final Future future = es.submit( worker);
ctr +=100;
}
while(!es.isTerminated()) {}
es.shutdown();
System.out.println(" finished the retrival ");
System.out.println("try to do something while the work is being done....");
System.out.println(""End work" "+ new java.util.Date());
return problemsToBeIndex;
}
public static void main(String[] argv) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
new CallBack().andAction();
}
}
package com.chegg.migrator.question.parallel.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import com.chegg.migrator.question.entity.TbsProblem;
public class CallingBackWorker implements Callable<Object>{
CallBack callBack;
static int calls = 0;
static int fetched =0;
static int indexed =0;
List<TbsProblem> problems = new ArrayList<TbsProblem>();
public CallingBackWorker() {
super();
}
@Override
public Object call() throws Exception {
System.out.println(" fetching the data ....."+calls++);
List<TbsProblem> problems = new ArrayList<TbsProblem>();
for(int i=0;i<50;i++) {
TbsProblem problem = new TbsProblem();
problem.setId("fetched"+fetched);
problems.add(problem);
}
Thread.sleep(500);
fetched +=problems.size();
System.out.println(" FETCHED ^^^^^^"+fetched);
List<String> lists = new ArrayList<String>();
for(TbsProblem tbs : problems) {
lists.add(tbs.getId());
}
Thread.sleep(500);
indexed += lists.size();
System.out.println(" committed, exiting.");
System.out.println(" INDEXED $$$$"+indexed);
callBack.returnResult(problems);
return null;
}
public CallBack getCallBack() {
return callBack;
}
public void setCallBack(CallBack callBack) {
this.callBack = callBack;
}
}