0

我正在做一个项目,我将在其中拥有不同的捆绑包。让我们举个例子,假设我有 5 个 Bundles,每个包都有一个方法 name process

现在,我正在process依次调用所有这 5 个捆绑包的方法,然后我正在写入数据库。但这就是我不想要的。

以下是我正在寻找的东西-

  1. 我需要process使用多线程代码并行调用所有这 5 个 Bundles 方法,然后写入数据库。我不确定这样做的正确方法是什么?我应该有五个线程吗?每个捆绑一个线程?但是在那种情况下会发生什么,假设如果我有 50 个包,那么我将有 50 个线程?
  2. 而且,我也想有超时功能。如果任何捆绑包花费的时间超过了我们设置的阈值,那么它应该超时并记录为该捆绑包花费了很多时间的错误。

我希望这个问题足够清楚。

下面是我到目前为止的代码,它process依次一个接一个地调用每个包的方法。

public void callBundles(final Map<String, Object> eventData) {

    final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {

        // calling the process method of a bundle
        final Map<String, String> response = entry.getPlugin().process(outputs);

        // then write to the database.
        System.out.println(response);
    }
}

我不确定最好和最有效的方法是什么?而且我不想按顺序写。因为,在未来,我可能会拥有超过 5 个捆绑包。

谁能给我一个例子来说明我该怎么做?我已经尝试过这样做,但不知何故这不是我想要的方式。

任何帮助将不胜感激。谢谢。

更新:-

这就是我想出的——

public void callBundles(final Map<String, Object> eventData) {

    // Three threads: one thread for the database writer, five threads for the plugin processors
    final ExecutorService executor = Executors.newFixedThreadPool(5);

    final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();

    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.EVENT_HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                final Map<String, String> response = entry.getPlugin().process(outputs);
                // put the response map in the queue for the database to read
                queue.offer(response);
            }
        });
    }

    Future<?> future = executor.submit(new Runnable () {
        public void run() {
            Map<String, String> map;
            try {
                while(true) {
                    // blocks until a map is available in the queue, or until interrupted
                    map = queue.take();
                    // write map to database
                    System.out.println(map);
                }
            } catch (InterruptedException ex) {
                // IF we're catching InterruptedException then this means that future.cancel(true)
                // was called, which means that the plugin processors are finished;
                // process the rest of the queue and then exit
                while((map = queue.poll()) != null) {
                    // write map to database
                    System.out.println(map);
                }
            }
        }
    });

    // this interrupts the database thread, which sends it into its catch block
    // where it processes the rest of the queue and exits
    future.cancel(true); // interrupt database thread

    // wait for the threads to finish
    try {
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //log error here
    }
}

但是我还不能在其中添加任何超时功能。而且如果我按原样运行上面的代码,那么它也没有运行。我错过了什么吗?

有人可以帮我吗?

4

1 回答 1

0

这是 BASIC 示例,部分基于ExecutorService 中提出的在 timeout 后中断任务的解决方案。

您将必须找出将其实现到您自己的代码中的最佳方法。仅将其用作指南!

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorExample {

    // This is used to "expire" long running tasks
    protected static final ScheduledExecutorService EXPIRE_SERVICE = Executors.newScheduledThreadPool(1);
    // This is used to manage the bundles and process them as required
    protected static final ExecutorService BUNDLES_SERVICE = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        // A list of the future tasks created by the BUNDLES_SERVICE.
        // We need this so we can monitor the progress of the output
        List<Future<String>> futureTasks = new ArrayList<>(100);
        // This is a list of all the tasks that have either completed
        // or begin canceled...we want these so we can determine
        // the results...
        List<Future<String>> completedTasks = new ArrayList<>(100);

        // Add all the Bundles to the BUNDLES_SERVICE
        for (int index = 0; index < 100; index++) {

            Bundle bundle = new Bundle();
            // We need a reference to the future so we can cancel it if we
            // need to
            Future<String> futureBundle = BUNDLES_SERVICE.submit(bundle);
            // Set this bundles future, see Bundle for details
            bundle.setFuture(futureBundle);
            // Add it to our monitor queue...
            futureTasks.add(futureBundle);

        }

        // Basically we are going to move all completed/canceled bundles
        // from the "active" to the completed list and wait until there
        // are no more "active" tasks
        while (futureTasks.size() > 0) {

            try {
                // Little bit of a pressure release...
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
            }

            // Check all the bundles...
            for (Future<String> future : futureTasks) {
                // If it has completed or was cancelled, move it to the completed
                // list.  AKAIK, isDone will return true is isCancelled is true as well,
                // but this illustrates the point
                if (future.isCancelled() || future.isDone()) {
                    completedTasks.add(future);
                }
            }

            // Remove all the completed tasks from the future tasks lists
            futureTasks.removeAll(completedTasks);
            // Some idea of progress...
            System.out.println("Still have " + futureTasks.size() + " outstanding tasks...");

        }

        // Dump the results...
        int index = 0;
        for (Future<String> future : completedTasks) {

            index++;
            System.out.print("Task " + index);
            if (future.isCancelled()) {

                System.out.println(" was canceled");

            } else if (future.isDone()) {

                try {
                    System.out.println(" completed with " + future.get());
                } catch (Exception ex) {
                    System.out.println(" failed because of " + ex.getMessage());
                }

            }

        }

        System.exit(0);

    }

    public static class ExpireBundle implements Runnable {

        private final Future futureBundle;

        public ExpireBundle(Future futureBundle) {
            this.futureBundle = futureBundle;
        }

        @Override
        public void run() {
            futureBundle.cancel(true);
        }

    }

    public static class Bundle implements Callable<String> {

        private volatile Future<String> future;

        @Override
        public String call() throws Exception {

            // This is the tricky bit.  In order to cancel a task, we
            // need to wait until it runs, but we also need it's future...
            // We could use another, single threaded queue to do the job
            // but that's getting messy again and it won't provide the information
            // we need back to the original calling thread that we are using
            // to schedule and monitor the threads...

            // We need to have a valid future before we can continue...
            while (future == null) {
                Thread.sleep(250);
            }

            // Schedule an expiry call for 5 seconds from NOW...this is important
            // I original thought about doing this when I schedule the original
            // bundle, but that precluded the fact that some tasks would not
            // have started yet...
            EXPIRE_SERVICE.schedule(new ExpireBundle(future), 5, TimeUnit.SECONDS);

            // Sleep for a random amount of time from 1-10 seconds
            Thread.sleep((long) (Math.random() * 9000) + 1000);

            return "Happy";

        }

        protected void setFuture(Future<String> future) {
            this.future = future;
        }
    }
}

还。我曾想过使用ExecutorService#invokeAll等待任务完成,但这排除了超时任务的能力。我不喜欢Future将.CallableFuture

于 2013-09-05T01:37:27.377 回答