0

我正在做一个项目,我将在其中拥有不同的捆绑包。举个例子,假设我有 5 个包,每个包都有一个方法名称process

现在,我正在process method依次调用所有这 5 个捆绑包,一个接一个,然后写入数据库。但这就是我不想要的。

  1. 我需要process method使用多线程并行调用所有这 5 个捆绑包,然后写入数据库。
  2. 而且我还想为这些线程提供一些超时功能。我将为捆绑包的所有线程设置默认超时设置。如果任何捆绑包花费的时间比我所拥有的超时设置要长,那么我想使这些线程超时,然后返回说这个捆绑包超时,因为它花费了很多时间。

我希望问题足够清楚......

下面是我到目前为止的代码,它依次调用 process 方法。

public void processEvents(final Map<String, Object> eventData) {

    final Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {

        final Map<String, String> response = entry.getPlugin().process(outputs);

        // write to the database.
        System.out.println(response);
    }
}

我不确定最好和最有效的方法是什么?因为,在未来,我可能会拥有超过 5 个捆绑包。

谁能给我一个如何实现这一目标的例子?任何帮助将不胜感激。谢谢。

4

2 回答 2

1

实现你想要的并不难,但你应该意识到,并发和超时都会增加复杂性,尤其是在错误处理方面。

例如,发生超时时正在运行的线程可能会在超时后继续运行。只有通过处理中断信号进行合作的表现良好的线程才能在处理过程中成功停止。

您还必须确保可以并行处理各个捆绑条目,即它们是线程安全的。如果他们在处理过程中修改了某些共享资源,那么您可能会因此得到奇怪的错误。

我还想知道您是否想将数据库写入每个线程。如果是这样,您将需要在写入数据库时​​处理中断;例如通过回滚事务。

无论如何,要获得线程池和所有线程的总超时,您可以使用ExecutorService(例如)固定池大小并使用该invokeAll方法调用所有线程。

下面的尝试很可能是有缺陷的,并且错误处理绝不是完整的,但它应该为您提供一个起点。

首先,为您的线程实现 Callable:

public class ProcessBundleHolderEntry implements Callable {
    private BundleRegistration.BundlesHolderEntry entry;
    private Map<String, String> outputs;

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
        this.entry = entry;
        this.outputs = outputs;
    }

    public Object call() throws Exception {
        final Map<String, String> response = entry.getPlugin().process(outputs);
        // write to the database.
        System.out.println(response);
        return response;
    }
}

现在,修改后的processEvents方法:

public void processEvents(final Map<String, Object> eventData) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
        entries.add(processBundleHolderEntry);
    }

    try {
        List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
        for (int i = 0; i < futures.size(); i++) {
            // This works since the list of future objects are in the
            // same sequential order as the list of entries
            Future<Object> future = futures.get(i);
            ProcessBundleHolderEntry entry = entries.get(i);
            if (!future.isDone()) {
                // log error for this entry
            }
        }
    } catch (InterruptedException e) {
        // handle this exception!
    }
}
于 2013-08-31T04:45:41.107 回答
0

Steinar 给出的答复是正确的,但正如您所说,此解决方案不可扩展,“将来,我可能会拥有超过 5 个捆绑包。” 我敢肯定,如果某些任务正在完成,您可能会在运行时或之后添加捆绑包,并且可能还有一个限制,您最多可以并行执行“n”个捆绑包,在这种情况下,executorService.InvokeAll 将终止待处理如果达到指定的计时器,则尚未启动的任务。
我创建了一个可能对您有用的简单示例,此示例提供了您希望并行运行多少线程的灵活性,您可以在需要时添加任务或捆绑包。

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import testproject.Bundles;
import testproject.ExecuteTimedOperation;

public class ParallelExecutor
{
    public static int NUMBER_OF_PARALLEL_POLL = 4;

    public static void main(String[] args)
    {       
         ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARALLEL_POLL );      
         // Create bundle of objects you want
         List<Bundles> lstBun = new ArrayList<Bundles>();
         for (Bundles bundles : lstBun) 
         {
            final ExecuteTimedOperation ope =new ExecuteTimedOperation(bundles, new HashMap<String, Object>());
            executorService.submit(new Runnable() 
            {
                public void run() 
                {
                    ope.ExecuteTask();
                }
            });
        }   
    }   
}
package testproject;
import java.util.Map;
import java.util.Random;

public class ExecuteTimedOperation 
{
    Bundles _bun;
    Map<String, Object> _eventData;
    public static long TimeInMilleToWait = 60 * 1000; //Time which each thread should wait to complete task     

    public ExecuteTimedOperation(Bundles bun, Map<String, Object> eventData)
    {
        _bun = bun;
        _eventData = eventData;  
    }


    public void ExecuteTask()
    {
        try 
        {       
            Thread t = new Thread(new Runnable() 
            {
                public void run() 
                {   
                    _bun.processEvents(_eventData);                 
                }
            });

            t.start();
            t.join(TimeInMilleToWait);
        } 
        catch (InterruptedException e) {
                //log back saying this bundle got timeout bcoz it was taking lot of time.
        }
        catch (Exception e) {
            //All other type of exception will be handled here    
        }
    }
}

package testproject;

import java.util.Map;

public class Bundles 
{

    public void processEvents(final Map<String, Object> eventData) 
    {
        //THE code you want to execute

    }
}
于 2013-08-31T10:01:13.990 回答