20

我想通过 10 个线程访问一个大文件(文件大小可能从 30 MB 到 1 GB 不等),然后处理文件中的每一行并通过 10 个线程将它们写入另一个文件。如果我只使用一个线程来访问 IO,其他线程就会被阻塞。该处理需要一些时间,几乎相当于从文件系统中读取一行代码。还有一个约束,输出文件中的数据应该与输入文件中的数据顺序相同。

我想知道你对这个系统的设计的想法。是否有任何现有的 API 来支持文件的并发访问?

写入同一个文件也可能导致死锁。

如果我担心时间限制,请建议如何实现这一点。

4

10 回答 10

15

我将从三个线程开始。

  1. 读取数据的读取器线程,将其分成“行”并将它们放入有界阻塞队列(Q1),
  2. 从 Q1 读取的处理线程,执行处理并将它们放入第二个有界阻塞队列 (Q2),以及
  3. 从 Q2 读取并写入磁盘的写入器线程。

当然,我也会确保输出文件与输入文件在物理上不同的磁盘上。

如果处理速度往往比 I/O(监控队列大小),那么您可以开始试验两个或多个并行“处理器”,它们在读取和写入数据的方式上是同步的。

于 2013-07-14T21:35:54.060 回答
13
  • 您应该从文件读取中抽象出来。创建一个读取文件并将内容分派到不同数量的线程的类。

该类不应该分派字符串,它应该将它们包装在一个Line包含元信息的类中,例如行号,因为您想保留原始序列。

  • 您需要一个处理类,它对收集的数据进行实际工作。在你的情况下,没有工作要做。该类仅存储信息,您可以在某天对其进行扩展以执行其他操作(例如反转字符串。附加一些其他字符串,...)

  • 然后你需要一个合并类,它在处理线程上执行某种多路合并排序,并按顺序收集对实例的所有引用Line

合并类也可以将数据写回文件,但要保持代码干净......

  • 我建议创建一个输出类,它再次从所有文件处理和东西中抽象出来。

当然,如果您的主内存不足,则此方法需要大量内存。您需要一种基于流的方法,该方法可以就地工作以保持较小的内存开销。


UPDATE基于流的方法

一切都保持不变,除了:

Reader线程将读取的数据泵入一个Balloon. 这个气球可以容纳一定数量的Line实例(数量越大,您消耗的主内存越多)。

处理线程Line从气球中获取 s,当气球变空时,阅读器将更多行泵入气球中。

合并类从上面的处理线程中获取行,然后编写器将数据写回文件。

也许你应该FileChannel在 I/O 线程中使用,因为它更适合读取大文件并且在处理文件时可能消耗更少的内存(但这只是一个估计的猜测)。

于 2013-07-19T13:41:02.890 回答
8

任何类型的 IO,无论是磁盘、网络等,通常都是瓶颈。

使用多个线程会加剧问题,因为很可能一次只有一个线程可以访问 IO 资源。

最好使用一个线程进行读取,将信息传递给工作线程池,然后直接从那里写入。但是,如果工人写到同一个地方,就会出现瓶颈,因为只有一个人可以拥有锁。通过将数据传递给单个编写器线程可以轻松修复。

简而言之”:

单个阅读器线程写入 BlockingQueue 等,这给了它一个自然的有序序列。

然后工作池线程在队列中等待数据,记录其序列号。

然后,工作线程将处理后的数据写入另一个 BlockingQueue,这次附加其原始序列号,以便

写入器线程可以获取数据并按顺序写入。

这可能会产生最快的实现。

于 2013-07-21T01:37:15.727 回答
3

一种可能的方法是创建一个线程来读取输入文件并将读取的行放入阻塞队列中。几个线程将等待来自这个队列的数据,处理数据。

另一种可能的解决方案可能是将文件分成块并将每个块分配给单独的线程。

为避免阻塞,您可以使用异步 IO。您还可以从Pattern-Oriented Software Architecture Volume 2中查看 Proactor 模式

于 2013-07-14T06:52:40.840 回答
3

您可以使用 java 中的 FileChannel 来执行此操作,它允许多个线程访问同一个文件。FileChannel 允许你从一个位置开始读写。请参阅下面的示例代码:

import java.io.*;
import java.nio.*;
import java.nio.channels.*;

public class OpenFile implements Runnable
{
    private FileChannel _channel;
    private FileChannel _writeChannel;
    private int _startLocation;
    private int _size;

    public OpenFile(int loc, int sz, FileChannel chnl, FileChannel write)
    {
        _startLocation = loc;
        _size = sz;
        _channel = chnl;
        _writeChannel = write;
    }

    public void run()
    {
        try
        {
            System.out.println("Reading the channel: " + _startLocation + ":" + _size);
            ByteBuffer buff = ByteBuffer.allocate(_size);
            if (_startLocation == 0)
                Thread.sleep(100);
            _channel.read(buff, _startLocation);
            ByteBuffer wbuff = ByteBuffer.wrap(buff.array());
            int written = _writeChannel.write(wbuff, _startLocation);
            System.out.println("Read the channel: " + buff + ":" + new String(buff.array()) + ":Written:" + written);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
        throws Exception
    {
        FileOutputStream ostr = new FileOutputStream("OutBigFile.dat");
        FileInputStream str = new FileInputStream("BigFile.dat");
        String b = "Is this written";
        //ostr.write(b.getBytes());
        FileChannel chnl = str.getChannel();
        FileChannel write = ostr.getChannel();
        ByteBuffer buff = ByteBuffer.wrap(b.getBytes());
        write.write(buff);
        Thread t1 = new Thread(new OpenFile(0, 10000, chnl, write));
        Thread t2 = new Thread(new OpenFile(10000, 10000, chnl, write));
        Thread t3 = new Thread(new OpenFile(20000, 10000, chnl, write));
        t1.start();
        t2.start();
        t3.start();
        t1.join();
        t2.join();
        t3.join();
        write.force(false);
        str.close();
        ostr.close();
    }
}

在这个示例中,有三个线程读取同一个文件并写入同一个文件并且不冲突。此示例中的此逻辑没有考虑到分配的大小不必以行结尾等结尾。您将根据您的数据找到正确的逻辑。

于 2013-07-23T08:55:22.413 回答
2

我之前也遇到过类似的情况,我的处理方式是这样的:

在主线程中逐行读取文件,并将该行的处理提交给一个executor。ExecutorService 的一个合理起点在这里。如果您计划使用固定数量的线程,您可能会对类中的Executors.newFixedThreadPool(10)工厂方法感兴趣Executors关于这个主题的 javadocs也不错。

Future基本上,我会提交所有作业,调用shutdown,然后在主线程中继续按照所有返回的顺序写入输出文件。您可以利用Futureget()方法的阻塞特性来确保顺序,但您真的不应该使用多线程来编写,就像您不会使用它来读取一样。说得通?

但是,1 GB数据文件呢?如果我是你,我首先会对有意义地分解这些文件感兴趣。

PS:我故意避免在答案中使用代码,因为我希望 OP 自己尝试一下。已经提供了足够多的指向特定类、API 方法和示例的指针。

于 2013-07-14T19:48:10.350 回答
2

请注意,理想的线程数受到硬件架构和其他因素的限制(您可以考虑咨询线程池以计算最佳线程数)。假设“10”是一个好数字,我们继续。=)

如果您正在寻找性能,您可以执行以下操作:

  • 使用您拥有的线程读取文件并根据您的业务规则处理每个线程。保留一个控制变量,指示要在输出文件中插入的下一个预期行。

  • 如果下一个预期的行已完成处理,则将其附加到缓冲区(队列)(如果您能找到一种直接插入输出文件的方法,那将是理想的,但您会遇到锁定问题)。否则,将此“未来”行存储在二叉搜索树中,按行位置对树进行排序。Binary-search-tree 为您提供了“O(log n)”的时间复杂度来进行搜索和插入,这对于您的上下文来说非常快。继续填充树,直到下一个“预期”行完成处理。

激活将负责打开输出文件的线程,定期使用缓冲区并将行写入文件。

此外,跟踪要插入文件的 BST 的“次要”预期节点。在开始搜索之前,您可以使用它来检查未来行是否在 BST 内。

  • 当下一个预期的行完成处理时,插入队列并验证下一个元素是否在二叉搜索树内。如果下一行在树中,则从树中删除节点并将节点的内容附加到队列中,如果下一行已经在树中,则重复搜索。
  • 重复此过程,直到所有文件处理完毕,树为空且队列为空。

这种方法使用 - O(n) 读取文件(但被并行化) - O(1) 将有序行插入队列 - O(Logn)*2 读取和写入二叉搜索树 - O( n) 写入新文件

加上您的业务规则和 I/O 操作的成本。

希望能帮助到你。

于 2013-07-23T03:27:27.077 回答
1

想到了Spring Batch 。

维护顺序需要一个后期处理步骤,即将读取的索引/键排序在处理上下文中。处理逻辑也应该将处理后的信息存储在上下文中。处理完成后,您可以对列表进行后期处理并写入文件.

不过要注意OOM问题。

于 2013-07-22T11:40:48.797 回答
0

由于需要维护顺序,所以问题本身就是说读写不能并行完成,因为它是顺序过程,你唯一可以并行做的就是处理记录,但这也不能只用一个作家来解决.

这是一个设计方案:

  1. 使用 One Thread t1 读取文件并将数据存储到 LinkedBlockingQueue Q1
  2. 使用另一个线程 t2 从 Q1 读取数据并放入另一个 LinkedBlockingQueue Q2
  3. 线程 t3 从 Q2 读取数据并写入文件。
  4. 为确保您不会遇到 OutofMemoryError,您应该使用适当的大小初始化队列
  5. 您可以使用 CyclicBarrier 来确保所有线程完成它们的操作
  6. 此外,您可以在 CyclicBarrier 中设置一个操作,您可以在其中执行后期处理任务。

祝你好运,希望你得到最好的设计。

干杯!!

于 2013-07-19T21:03:31.720 回答
0

我过去也遇到过类似的问题。我必须从单个文件中读取数据,对其进行处理并将结果写入其他文件。由于加工部分很重。所以我尝试使用多个线程。这是我为解决我的问题而遵循的设计:

  • 使用主程序作为主程序,一次读取整个文件(但不要开始处理)。为每一行创建一个数据对象及其顺序。
  • 在 main 中使用一个 priorityblockingqueue 说队列,将这些数据对象添加到其中。在每个线程的构造函数中共享此队列的引用。
  • 创建不同的处理单元,即将侦听此队列的线程。当我们将数据对象添加到这个队列中时,我们会调用 notifyall 方法。所有线程将单独处理。
  • 处理后,将所有结果放在单个映射中,并将结果与​​ key 作为其序号。
  • 当队列为空且所有线程都空闲时,表示处理完成。停止线程。遍历地图并将结果写入文件
于 2013-07-22T11:08:30.983 回答