0

我有以下问题,我不确定如何设计部分解决方案:

我有一个大文本文件,我逐行阅读。我需要处理每一行并更新一个 HashMap。

AFAIK 我需要一个生产者线程从文件中读取行,并将这些行分派到消费者线程池。消费者线程应该更新 ConcurrentHashMap 然后获取新行。

我的问题是:消费者线程如何访问 ConcurrentHashMap?如果我使用固定的线程池,生产者是否需要先将行添加到队列中,还是可以简单地提交或执行新的消费者?

编辑:Zim-Zam 是正确的;我希望消费者在完成后将他们的结果转储到 ConcurrentHashMap 中。

我在主线程中创建 ConcurrentHashMap,并将对它的引用传递给构造函数中的消费者。消费者应该在他们的运行方法中添加或增加一个 AtomicInteger。当所有行都被读取并且消费者完成时,我如何在主线程中判断?

再次感谢。

4

4 回答 4

1

You can either have all of the consumers share the same queue that the producer adds to, or else you can give each consumer its own queue that the producer accesses via a circular linked list or a similar data structure so that each consumer's queue receives more or less the same amount of data (e.g. if you have 3 consumers, then the producer would add data to queue1, then queue2, then queue3, then queue1, etc).

You can give each consumer a reference to the same ConcurrentHashMap (e.g. in the consumer's constructor), or else you can make the ConcurrentHashMap accessible via a static getter method.

于 2013-05-14T03:22:25.950 回答
1

我认为您实际上并不需要按照您建议的方式使用生产者消费者队列。

只需让主队列读取文件,并为您读取的每一行创建一个对应的 Runnable 对象(将其视为命令)并将其放入线程池执行器。Runnable 对象的内容只是处理该行并将结果放入 concurrentHashMap 的逻辑

可以使用有界或无界阻塞队列创建 ThreadPoolExecutor,具体取决于您想要的行为。

在伪代码中是这样的:

class LineHandler implements Runnable {
    String line;
    ConcurrentHashMap resultMap;
    public LineHandler(String line, ConcurrentHashMap resultMap) {
        this.line = line;
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        // work on line
        // update resultMap
    }
}

// logic in your file reader thread, supposed to be in a loop:

while (moreLinesInFile()) {
    String line = readFromFile();
    threadPoolExecutor.submit(new LineHandler(line, concurrentHashMap));
}

threadPoolExecutor.shutdown();
于 2013-05-14T06:03:19.830 回答
0

我建议您使用 aBlockingQueue来存储要处理的行。

主线程完成文件解析后,主线程将 apoison object作为最后一个对象放入队列中,并等待awaitTermination(...)消费者完成。

毒对象在消费者线程中以特殊方式处理。处理位置对象的消费者线程尝试到shutdown()ExecutorService而主线程正在等待。

至于消费者的结果,只需将它们添加到某个线程安全容器中即可。生产者/消费者问题由Queue: poll(...), put(...).

希望我能帮上忙

于 2013-05-14T09:40:47.163 回答
0

使用CountDownLatch

// in main thread
// assume consumers are in some kind of container
List<MyConsumer> consumers...
CountDownLatch latch = new CountDownLatch( consumers.size() );

for( MyConsumer c : consumers ) {
    c.setLatch( latch );
    c.start(); // starts asychronous, or submit to executor, whatever you're doing
}

// block main thread, optionally timing out
latch.await();


// Then in consumer when it's done it's work:
latch.countDown();
于 2013-05-14T05:21:24.667 回答