3

我正在实现一个应该接收大文本文件的类。我想将它分成块,每个块由一个不同的线程保存,该线程将计算这个块中每个字符的频率。我希望启动更多线程以获得更好的性能,但事实证明性能越来越差。这是我的代码:

public class Main {

    public static void main(String[] args) 
    throws IOException, InterruptedException, ExecutionException, ParseException  
    {

        // save the current run's start time
        long startTime = System.currentTimeMillis();

        // create options 
        Options options = new Options();
        options.addOption("t", true, "number of threads to be start");

        // variables to hold options 
        int numberOfThreads = 1;

        // parse options
        CommandLineParser parser = new DefaultParser();
        CommandLine cmd;
        cmd = parser.parse(options, args);
        String threadsNumber = cmd.getOptionValue("t");
        numberOfThreads = Integer.parseInt(threadsNumber);

        // read file
        RandomAccessFile raf = new RandomAccessFile(args[0], "r");
        MappedByteBuffer mbb 
            = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, raf.length());

        ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
        Set<Future<int[]>> set = new HashSet<Future<int[]>>();

        long chunkSize = raf.length() / numberOfThreads;
        byte[] buffer = new byte[(int) chunkSize];

        while(mbb.hasRemaining())
        {
            int remaining = buffer.length;
            if(mbb.remaining() < remaining)
            {
                remaining = mbb.remaining();
            }
            mbb.get(buffer, 0, remaining);
            String content = new String(buffer, "ISO-8859-1");
            @SuppressWarnings("unchecked")
            Callable<int[]> callable = new FrequenciesCounter(content);
            Future<int[]> future = pool.submit(callable);
            set.add(future);

        }

        raf.close();

        // let`s assume we will use extended ASCII characters only
        int alphabet = 256;

        // hold how many times each character is contained in the input file
        int[] frequencies = new int[alphabet];

        // sum the frequencies from each thread
        for(Future<int[]> future: set)
        {
            for(int i = 0; i < alphabet; i++)
            {
                frequencies[i] += future.get()[i];
            }
        }
    }

}

//help class for multithreaded frequencies` counting
class FrequenciesCounter implements Callable
{
    private int[] frequencies = new int[256];
    private char[] content;

    public FrequenciesCounter(String input)
    {
        content = input.toCharArray();
    }

    public int[] call()
    {
        System.out.println("Thread " + Thread.currentThread().getName() + "start");

        for(int i = 0; i < content.length; i++)
        {
            frequencies[(int)content[i]]++;
        }

        System.out.println("Thread " + Thread.currentThread().getName() + "finished");

        return frequencies;
    }
}
4

2 回答 2

3

正如评论中所建议的,从多个线程读取时,您(通常)不会获得更好的性能。相反,您应该处理在多个线程上读取的块。通常处理会进行一些阻塞、I/O 操作(保存到另一个文件?保存到数据库?HTTP 调用?),如果您在多个线程上处理,您的性能会变得更好。

对于处理,您可能有 ExecutorService(具有合理数量的线程)。用于java.util.concurrent.Executors获取实例 java.util.concurrent.ExecutorService

有了ExecutorService实例,您可以提交您的块进行处理。提交块不会阻塞。ExecutorService将开始在单独的线程处理每个块(细节取决于配置ExecutorService)。您可以提交Runnable或的实例Callable

最后,在您提交所有项目后,您应该在 ExecutorService 中调用awaitTermination。它将等到所有提交项目的处理完成。在 awaitTermination 返回后,您应该调用 shutdownNow() 来中止处理(否则它可能会无限期挂起,处理一些流氓任务)。

于 2017-06-24T20:39:05.657 回答
1

您的程序几乎肯定会受到从磁盘读取速度的限制。使用多个线程对此没有帮助,因为限制是对从磁盘传输信息的速度的硬件限制。

此外,同时使用 RandomAccessFile 和后续缓冲区可能会导致小幅减速,因为您是在读取数据之后但在处理之前移动内存中的数据,而不是仅仅在原地处理它。最好不要使用中间缓冲区。

通过从文件直接读取到最终缓冲区并在这些缓冲区被填充时分派这些缓冲区以由线程处理,而不是在处理之前等待整个文件被读取,您可能会稍微加快速度。但是,大部分时间仍将用于磁盘读取,因此任何加速都可能很小。

于 2017-06-24T20:47:19.817 回答