1

首先,我想说的是,我正在从 python 升级到更复杂的代码。我现在正在使用 Java,而且我非常陌生。我知道 Java 非常擅长多线程,这很好,因为我正在使用它来处理 TB 级的数据。

数据输入只是输入到迭代器中,我有一个类,它封装了一个运行函数,该函数从迭代器中获取一行,进行一些分析,然后将分析写入文件。线程必须相互共享的唯一信息是它们正在写入的对象的名称。简单吧?我只希望每个线程同时执行 run 函数,这样我们就可以快速遍历输入数据。在python中这很简单。

from multiprocessing import Pool
f = open('someoutput.csv','w');
def run(x):
f.write(analyze(x))

p = Pool(8);
p.map(run,iterator_of_input_data);

因此,在 Java 中,我有 10K 行分析代码,并且可以非常轻松地遍历我的输入,将它传递给我的 run 函数,该函数反过来调用我的所有分析代码,将其发送到输出对象。

public class cool {
    ...
    public static void run(Input input,output) {
        Analysis an = new Analysis(input,output);    
    }
    public static void main(String args[]) throws Exception {
        Iterator iterator = new Parser(File(input_file)).iterator();
        File output = File(output_object);
        while(iterator.hasNext(){
            cool.run(iterator.next(),output);
        }
    }
}

我要做的就是让多个线程获取迭代器对象并执行 run 语句。一切都是独立的。我一直在研究 java 多线程的东西,但它用于通过网络交谈、共享数据等。这是否像我想的那样简单?如果有人能指出我正确的方向,我会很乐意做腿部工作。

谢谢

4

1 回答 1

2

ExecutorService (ThreadPoolExecutor) 将是 Java 等价物。

ExecutorService executorService =
    new ThreadPoolExecutor(
        maxThreads, // core thread pool size
        maxThreads, // maximum thread pool size
        1, // time to wait before resizing pool
        TimeUnit.MINUTES, 
        new ArrayBlockingQueue<Runnable>(maxThreads, true),
        new ThreadPoolExecutor.CallerRunsPolicy());

ConcurrentLinkedQueue<ResultObject> resultQueue;

while (iterator.hasNext()) {
    executorService.execute(new MyJob(iterator.next(), resultQueue))
}

将您的工作实现为 Runnable。

class MyJob implements Runnable {
    /* collect useful parameters in the constructor */
    public MyJob(...) {
        /* omitted */
    }

    public void run() {
        /* job here, submit result to resultQueue */
    }
}

resultQueue用于收集作业的结果。

有关详细信息,请参阅java api 文档

于 2012-12-07T08:07:28.957 回答