0

我在我的多线程程序中使用 ThreadPoolExecutor,我希望每个线程都应该有特定的 ID 范围,如果ThreadSize is set as 10然后Start = 1 and End = 1000每个线程都有 100 个 ID 的范围(基本上通过将结束范围除以线程大小),它可以在不踩到其他线程的情况下使用。

Thread1 will use 1 to 100 (id's)

Thread2 will use 101 to 200 (id's)

Thread3 will use 201 to 300 (id's)
-----
-----
Thread10 will use 901 to 1000

我基本知道逻辑,逻辑可以是这样的-

Each thread gets `N = (End - Start + 1) / ThreadSize` numbers.

Thread number `i` gets range `(Start + i*N) - (Start + i*N + N - 1)`.

因为我是第一次使用 ThreadPoolExecutor,所以我不确定我应该在代码中的什么地方使用这个逻辑,以便每个线程都使用预定义的 ID 而不会踩到其他线程。任何建议将不胜感激。

public class CommandExecutor {

    private List<Command> commands;
    ExecutorService executorService;
    private static int noOfThreads = 3;

    // Singleton
    private static CommandExecutor instance;
    public static synchronized CommandExecutor getInstance() {
        if (instance == null) {
            instance = new CommandExecutor();
        }
        return instance;
    }

    private CommandExecutor() {

        try {
            executorService = Executors.newFixedThreadPool(noOfThreads);
        } catch(Exception e) {
            System.out.println(e);
        }
    }

    // Get the next command to execute based on percentages
    private synchronized Command getNextCommandToExecute() {

    }

    // Runs the next command
    public synchronized void runNextCommand() {
        // If there are any free threads in the thread pool
        if (!(((ThreadPoolExecutor) executorService).getActiveCount() < noOfThreads))
            return;
        // Get command to execute
        Command nextCommand = getNextCommandToExecute();
        // Create a runnable wrapping that command
        Task nextCommandExecutorRunnable = new Task(nextCommand);
        executorService.submit(nextCommandExecutorRunnable); // Submit it for execution
    }

    // Implementation of runnable (the real unit level command executor)
    private static final class Task implements Runnable {
        private Command command;
        public Task(Command command) {
            this.command = command;
        }
        public void run() {
            // Run the command
            command.run();
        }
    }

    // A wrapper class that invoked at every certain frequency, asks CommandExecutor to execute next command (if any free threads are available)
    private static final class CoreTask implements Runnable {
        public void run() {
            CommandExecutor commandExecutor = CommandExecutor.getInstance();
            commandExecutor.runNextCommand();
        }
    }

    // Main Method
    public static void main(String args[]) {
        // Scheduling the execution of any command every 10 milli-seconds
        Runnable coreTask = new CoreTask();
        ScheduledFuture<?> scheduledFuture = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(coreTask, 0, 10, TimeUnit.MILLISECONDS);
    }
}
4

1 回答 1

0

这是否是一个好主意,我会让你决定。但是为了帮助你,我写了一个小程序来做你想做的事……在我的例子中,我只是总结了“ids”。

这是代码:

public class Driver {

private static final int N = 5;

public static void main(String args[]) throws InterruptedException, ExecutionException{
    int startId = 1;
    int endId = 1000;
    int range = (1 + endId - startId) / N;
    ExecutorService ex = Executors.newFixedThreadPool(N);
    List<Future<Integer>> futures = new ArrayList<Future<Integer>>(N);

    // submit all the N threads
    for (int i = startId; i < endId; i += range) {
        futures.add(ex.submit(new SumCallable(i, range+i-1)));
    }

    // get all the results
    int result = 0;
    for (int i = 0; i < futures.size(); i++) {
        result += futures.get(i).get();

    }
    System.out.println("Result of summing over everything is : " + result);

}

private static class SumCallable implements Callable<Integer> {

    private int from, to, count;
    private static int countInstance = 1;

    public SumCallable(int from, int to) {
        this.from = from;
        this.to = to;
        this.count = countInstance;
        System.out.println("Thread " + countInstance++ + " will use " + from + " to " + to);
    }

    // example implementation: sums over all integers between from and to, inclusive.
    @Override
    public Integer call() throws Exception {
        int result = 0;
        for (int i = from; i <= to; i++) {
            result += i;
        }
        System.out.println("Thread " + count + " got result : " + result);
        return result;
    }

}

}

它产生以下输出(请注意,在真正的多线程方式中,您有随机顺序的打印语句,因为线程以系统决定的任何顺序执行):

线程 1 将使用 1 到 200

线程 2 将使用 201 到 400

线程 1 得到结果:20100

线程 3 将使用 401 到 600

线程 2 得到结果:60100

线程 4 将使用 601 到 800

线程 3 得到结果:100100

线程 5 将使用 801 到 1000

线程 4 得到结果:140100

线程 5 得到结果:180100

对所有内容求和的结果是:500500

于 2012-05-17T23:48:21.720 回答