0

我正在尝试编写多线程java程序来并行获取mongo数据并存储它。下面是 CallBack 的代码,它使用 70 个线程线程池创建工作线程。我正在使用 Callable 来回调 CallBack。

问题是获取的项目多于返回到回调列表。不知道怎么回事。任何人都可以帮忙吗?即使是“FETCHED....”打印的数字也比“INDEXED ...”要大。线程是否相互交叉?

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

import javax.xml.parsers.ParserConfigurationException;

import org.apache.solr.client.solrj.SolrServerException;
import org.xml.sax.SAXException;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallBack {
    List<TbsProblem> problemsToBeIndex = new ArrayList<TbsProblem>();
    final int NO_OF_THREAD = 70;

    public void returnResult(List<TbsProblem> result) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        problemsToBeIndex.addAll(result);
        System.out.println(" Data Indexed "+problemsToBeIndex.size());
    }
    public  List<TbsProblem> andAction() throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(NO_OF_THREAD);
            int ctr=0;
            while(ctr <= 100000) {
                CallingBackWorker worker = new CallingBackWorker();
                worker.setCallBack(this);
                final Future future = es.submit( worker);
                ctr +=100;
            }

            while(!es.isTerminated()) {}
            es.shutdown();
            System.out.println(" finished the retrival ");
        System.out.println("try to do something while the work is being done....");
        System.out.println("&quot;End work&quot; "+ new java.util.Date());
        return problemsToBeIndex;
    }

    public static void main(String[] argv) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        new CallBack().andAction();
    }
}

package com.chegg.migrator.question.parallel.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallingBackWorker implements Callable<Object>{
    CallBack callBack;

    static int calls = 0;
    static int fetched =0;
    static int indexed =0;
    List<TbsProblem> problems = new ArrayList<TbsProblem>();

    public CallingBackWorker() {
        super();
    }

    @Override
    public Object call() throws Exception {
        System.out.println("  fetching the data ....."+calls++);
        List<TbsProblem> problems = new ArrayList<TbsProblem>();
        for(int i=0;i<50;i++) {
            TbsProblem problem = new TbsProblem();
            problem.setId("fetched"+fetched);
            problems.add(problem);
        }
        Thread.sleep(500);
        fetched +=problems.size();
        System.out.println(" FETCHED ^^^^^^"+fetched);

        List<String> lists = new ArrayList<String>();
        for(TbsProblem tbs : problems) {
            lists.add(tbs.getId());
        }
        Thread.sleep(500);
        indexed += lists.size();
        System.out.println("   committed, exiting.");
        System.out.println(" INDEXED $$$$"+indexed);
        callBack.returnResult(problems);
        return null;
    }

      public CallBack getCallBack() {
        return callBack;
    }

    public void setCallBack(CallBack callBack) {
        this.callBack = callBack;
    }
}
4

1 回答 1

1

fetched是在每个可调用对象之外声明的吗?你在几个线程中增加它?如果是这样,那就有问题了。递增整数不是线程安全的。如果是这种情况,请用 AtomicInteger 替换fetched或在同步块内增加它。

为什么在多个线程中增加整数是个问题?每个线程都会这样做:

STEP 1: read current value of fetched
STEP 2: calculate current value + problems.size()
STEP 3: assign new value to fetched

图像线程 (1) 完成步骤 1 和 2,计算fetched的新值为10。然后线程 (2) 到 (50) 完成步骤 1,2 和 3。fetched现在的值为 1000。最后线程 (1 ) 完成第 3 步,再次分配fetched值 10。

于 2013-07-04T18:22:12.700 回答