1

我有每 1 分钟运行一次的任务。这个任务包含几个处理一些数据的方法。Process1 读取状态 1 中的数据(一个或多个),并在此进程结束时将状态更新为 2。 Process2 读取状态 2 中的数据,依此类推。所以现在我想改进这个处理使用taskExecutor( org.springframework.core.task.SimpleAsyncTaskExecutor) 并行运行它:

public void process1() {

    List<Object> objects = someDao.readDataWithStatus("1");
    if (objects == null || objects.isEmpty()) {
        return;
    }

    for (final Object object : objects) {
            if (BooleanUtils.isTrue(isParalelProcess())) {
                taskExecutor.execute(new Runnable() {

                    @Override
                    public void run() {
                        process(object);
                    }
                });
            } else {
                process(object);
            }           
    }
}

假设我们有一个状态为 1 的对象,我们想要并行处理。每一分钟运行一次的任务开始处理。Process1 读取状态为 1 的数据放入 taskExecutor 并进入下一个方法。此方法没有找到状态为 2 的对象,因此这里无事可做。一分钟后 process2 读取状态为 2 的对象,依此类推。处理对象需要 1 秒到几分钟。如您所见,过程显着放缓,仅需几秒钟。有没有选择如何改进这个处理?

4

1 回答 1

0

我花了很少的时间。它可能对你有帮助。

public class SampleTest
{

    public static void main(String[] args)
    {

        int noOfThreads = 100;
        int maxThreadinQue = 100;

        LinkedBlockingQueue<Runnable> processOneworkQueue = new LinkedBlockingQueue<Runnable>();

        ThreadPoolExecutor processOneThreadPoolExecutor = new ThreadPoolExecutor(noOfThreads, noOfThreads, 0L,
                TimeUnit.SECONDS,

                processOneworkQueue, new ThreadFactory()
                {
                    private AtomicInteger itsCounter = new AtomicInteger();

                    public Thread newThread(Runnable theRunnable)
                    {
                        Thread aThread = new Thread(theRunnable, "theThreadName" + "#" + itsCounter.getAndIncrement());
                        aThread.setDaemon(true);
                        return aThread;
                    }
                });

        LinkedBlockingQueue<Runnable> processTwoworkQueue = new LinkedBlockingQueue<Runnable>();

        ThreadPoolExecutor processTwoThreadPoolExecutor = new ThreadPoolExecutor(noOfThreads, noOfThreads, 0L,
                TimeUnit.SECONDS,

                processTwoworkQueue, new ThreadFactory()
                {
                    private AtomicInteger itsCounter = new AtomicInteger();

                    public Thread newThread(Runnable theRunnable)
                    {
                        Thread aThread = new Thread(theRunnable, "theThreadName" + "#" + itsCounter.getAndIncrement());
                        aThread.setDaemon(true);
                        return aThread;
                    }
                });

        ArrayList<SampleObject> sampleObjects = new ArrayList<SampleObject>();
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());
        sampleObjects.add(new SampleObject());

        while (true)
        {
            Iterator<SampleObject> it = sampleObjects.iterator();
            while (it.hasNext())
            {

                final SampleObject sampleObject = it.next();

                if (sampleObject.getStatus() == 0)
                {
                    if (processOneworkQueue.size() < maxThreadinQue)
                    {
                        processOneThreadPoolExecutor.submit(new Runnable()
                        {
                            @Override
                            public void run()
                            {
                                process1(sampleObject);

                            }
                        });
                    }
                }
                else if (sampleObject.getStatus() == 1)
                {
                    if (processTwoworkQueue.size() < maxThreadinQue)
                    {
                        processTwoThreadPoolExecutor.submit(new Runnable()
                        {
                            @Override
                            public void run()
                            {
                                process2(sampleObject);

                            }
                        });
                    }
                }
                else if (sampleObject.getStatus() == 2)
                {

                    it.remove();

                }
            }
        }
    }

    static void process1(SampleObject sampleObject)
    {
        System.out.println("process1");
        sampleObject.setStatus(1);

    }

    static void process2(SampleObject sampleObject)
    {
        System.out.println("process2");
        sampleObject.setStatus(2);
    }

}

class SampleObject
{
    int status=0;

    public int getStatus()
    {
        return status;
    }

    public void setStatus(int status)
    {
        this.status = status;
    }
}
于 2013-10-03T14:32:40.840 回答