7

我试图了解java.util.concurrent包中的实用程序,并了解到我们可以在方法内成功完成任务后将callable对象提交给ExecutorService返回Future,其中填充了由返回的值。callablecall()

我了解所有可调用对象都是使用多个线程同时执行的。

当我想看看批处理任务执行有多少改进ExecutorService时,我想到了捕捉时间。

以下是我尝试执行的代码 -

package concurrency;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class ExecutorExample {

    private static Callable<String> callable = new Callable<String>() {

        @Override
        public String call() throws Exception {
            StringBuilder builder = new StringBuilder();
            for(int i=0; i<5; i++) {
                builder.append(i);
            }
            return builder.toString();
        }
    };

    public static void main(String [] args) {
        long start = System.currentTimeMillis();
        ExecutorService service = Executors.newFixedThreadPool(5);
        List<Future<String>> futures = new ArrayList<Future<String>>();
        for(int i=0; i<5; i++) {
            Future<String> value = service.submit(callable);
            futures.add(value);
        }
        for(Future<String> f : futures) {
            try {
                System.out.println(f.isDone() + " " + f.get());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        long end  = System.currentTimeMillis();
        System.out.println("Executer callable time - " + (end - start));
        service.shutdown();

        start = System.currentTimeMillis();
        for(int i=0; i<5; i++) {
            StringBuilder builder = new StringBuilder();
            for(int j=0; j<5; j++) {
                builder.append(j);
            }
            System.out.println(builder.toString());
        }
        end = System.currentTimeMillis();
        System.out.println("Normal time - " + (end - start));
    }

}

这是这个的输出 -

true 01234
true 01234
true 01234
true 01234
true 01234
Executer callable time - 5
01234
01234
01234
01234
01234
Normal time - 0

如果我遗漏了什么或以错误的方式理解某些东西,请告诉我。

在此先感谢您抽出宝贵时间和对此线程的帮助。

4

3 回答 3

4

如果您在 Callable 中的任务很小,则由于任务切换和初始化开销,您将无法从并发中受益。尝试在可调用中添加更重的循环,比如 1000000 次迭代,你可以看到差异

于 2013-03-14T09:01:46.520 回答
2

当您第一次运行任何代码时,特别是需要时间。如果您将任务传递给另一个线程,则可能需要 1-10 微秒,如果您的任务花费的时间少于此时间,则开销可能大于收益。即,如果您的开销足够高,则使用多个线程可能比使用单个线程慢得多。

我建议你

  • 将任务的成本增加到 1000 次迭代。
  • 确保在单线程示例中不丢弃结果
  • 运行这两个测试至少几秒钟以确保代码已经预热。
于 2013-03-14T09:12:21.593 回答
1

不是答案(但我不确定代码是否适合评论)。为了扩展 Peter 所说的内容,通常有一个最佳工作点(以执行时间衡量),以平衡池/队列开销与工作人员之间的公平工作分配。该代码示例有助于找到该最佳位置的估计值。在您的目标硬件上运行。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class FibonacciFork extends RecursiveTask<Long> {

private static final long serialVersionUID = 1L;

public FibonacciFork( long n) {
    super();
    this.n = n;
}

static ForkJoinPool fjp = new ForkJoinPool( Runtime.getRuntime().availableProcessors());

static long fibonacci0( long n) {
    if ( n < 2) {
        return n;
    }
    return fibonacci0( n - 1) + fibonacci0( n - 2);
}

static int  rekLimit = 8;

private static long stealCount;

long    n;

private long forkCount;

private static AtomicLong forks = new AtomicLong( 0);

public static void main( String[] args) {

    int n = 45;
    long    times[] = getSingleThreadNanos( n);
    System.out.println( "Single Thread Times complete");
    for ( int r = 2;  r <= n;  r++) {
        runWithRecursionLimit( r, n, times[ r]);
    }
}

private static long[] getSingleThreadNanos( int n) {
    final long times[] = new long[ n + 1];
    ExecutorService es = Executors.newFixedThreadPool( Math.max( 1, Runtime.getRuntime().availableProcessors() / 2));
    for ( int i = 2;  i <= n;  i++) {
        final int arg = i;
        Runnable runner = new Runnable() {
            @Override
            public void run() {
                long    start = System.nanoTime();
                final int minRuntime = 1000000000;
                long    runUntil = start + minRuntime;
                long    result = fibonacci0( arg);
                long    end = System.nanoTime();
                int         ntimes = Math.max( 1, ( int) ( minRuntime / ( end - start)));
                if ( ntimes > 1) {
                    start = System.nanoTime();
                    for ( int i = 0;  i < ntimes;  i++) {
                        result = fibonacci0( arg);
                    }
                    end = System.nanoTime();
                }
                times[ arg] = ( end - start) / ntimes;
            }
        };
        es.execute( runner);
    }
    es.shutdown();
    try {
        es.awaitTermination( 1, TimeUnit.HOURS);
    } catch ( InterruptedException e) {
        System.out.println( "Single Timeout");
    }
    return times;
}

private static void runWithRecursionLimit( int r, int arg, long singleThreadNanos) {
    rekLimit = r;
    long    start = System.currentTimeMillis();
    long    result = fibonacci( arg);
    long    end = System.currentTimeMillis();
    // Steals zählen
    long    currentSteals = fjp.getStealCount();
    long    newSteals = currentSteals - stealCount;
    stealCount = currentSteals;
    long    forksCount = forks.getAndSet( 0);
    System.out.println( "Fib(" + arg + ")=" + result + " in " + ( end-start) + "ms, recursion limit: " + r +
            " at " + ( singleThreadNanos / 1e6) + "ms, steals: " + newSteals + " forks " + forksCount);
}

static long fibonacci( final long arg) {
    FibonacciFork   task = new FibonacciFork( arg);
    long result = fjp.invoke( task);
    forks.set( task.forkCount);
    return result;
}

@Override
protected Long compute() {
    if ( n <= rekLimit) {
        return fibonacci0( n);
    }
    FibonacciFork   ff1 = new FibonacciFork( n-1);
    FibonacciFork   ff2 = new FibonacciFork( n-2);
    ff1.fork();
    long    r2 = ff2.compute();
    long    r1 = ff1.join();
    forkCount = ff2.forkCount + ff1.forkCount + 1;
    return r1 + r2;
}
}
于 2013-03-14T11:21:22.190 回答