2

我是Java并发编程的新手。

我需要阅读、分析和处理一个增长极快的日志文件,所以我必须要快。我的想法是读取文件(逐行),并在匹配相关行后,我想将这些行传递给可以对该行进行进一步处理的单独线程。我在以下示例代码中将这些线程称为“IOThread”。

我的问题是 IOthread.run() 中的 BufferedReader readline 显然永远不会返回。在线程内读取 Stream 的工作方式是什么?有没有比下面的方法更好的方法?

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

class IOThread extends Thread {
    private InputStream is;
    private int t;

    public IOThread(InputStream is, int t)  {
        this.is = is;
        this.t = t;
        System.out.println("iothread<" + t + ">.init");
    }

    public void run() {
        try {
            System.out.println("iothread<" + t + ">.run");
            String line;

            BufferedReader streamReader = new BufferedReader(new InputStreamReader(is));
            while ((line = streamReader.readLine()) != null) {
                System.out.println("iothread<" + t + "> got line " + line);
            }
            System.out.println("iothread " + t + " end run");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class Stm {
    public Stm(String filePath) {
        System.out.println("start");

        try {
            BufferedReader reader = new BufferedReader(new FileReader(filePath));

            PipedOutputStream po1 = new PipedOutputStream();
            PipedOutputStream po2 = new PipedOutputStream();
            PipedInputStream pi1 = new PipedInputStream(po1);
            PipedInputStream pi2 = new PipedInputStream(po2);
            IOThread it1 = new IOThread(pi1,1);
            IOThread it2 = new IOThread(pi2,2);

            it1.start();
            it2.start();
//          it1.join();
//          it2.join();

            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println("got line " + line);

                if (line.contains("aaa")) {
                    System.out.println("passing to thread 1: " + line);  
                    po1.write(line.getBytes());
                } else if (line.contains("bbb")) {
                    System.out.println("passing to thread 2: " + line);  
                    po2.write(line.getBytes());
                }
            }
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Stm(args[0]);
    }

}

一个示例输入文件将是:

line 1
line 2
line 3 aaa ...
line 4
line 5 bbb ...
line 6 aaa ...
line 7
line 8 bbb ...
line 9 bbb ...
line 10

使用输入文件的文件名作为参数调用上述代码。

4

2 回答 2

4

由于以下原因,您的 iothread 中的阅读器一直停留在 while 循环的第一次迭代的头部:您从 STM 线程传递读取行的内容,但您没有附加换行符 (\n) . 由于您的缓冲阅读器等待换行符(如在 .readLine() 中),它会永远等待。您可以像这样修改您的代码:

   if (line.contains("aaa")) {
                System.out.println("passing to thread 1: " + line);  
                byte[] payload = (line+"\n").getBytes();
                po1.write(payload);
            } else if (line.contains("bbb")) {
                System.out.println("passing to thread 2: " + line);  
                byte[] payload = (line+"\n").getBytes();
                po2.write(payload);
            }

但是我不得不说这根本不是一个优雅的解决方案,您可以使用阻塞队列或类似的东西来为您的 IOThreads 提供内容。这样,您可以避免将输入转换为字符串到字节再转换回字符串(不是说摆脱所有的流)。

于 2012-10-10T11:48:10.273 回答
2

恕我直言,你把它弄反了。创建多个线程来“处理”东西,而不是从文件中读取数据。从文件中读取数据时,无论如何您都会遇到瓶颈,因此拥有多个线程不会有任何区别。最简单的解决方案是在给定线程中尽可能快地读取行并将这些行存储在共享队列中。然后可以由任意数量的线程访问此队列以进行相关处理。

这样,您实际上可以在 I/O 或读取器线程忙于读取/等待数据时进行并发处理。如果可能,请在阅读器线程中将“逻辑”保持在最低限度。只需阅读这些行,让工作线程完成真正繁重的工作(匹配模式、进一步处理等)。只需使用线程安全队列,您就应该是洁净的。

编辑:BlockingQueue使用基于数组或基于链表的一些变体。

于 2012-10-10T11:37:34.687 回答