0

简而言之:我想以并行方式处理带有循环引用的大图。而且我无法访问完整的图表,我必须爬过它。我想组织有效的队列来做到这一点。我很感兴趣是否有任何最佳做法可以做到这一点?

我正在尝试为这种策略组织无限的数据处理流程:每个线程从队列中获取节点,处理它,在处理之后 - 可能会出现一些新的处理节点 - 所以线程必须将它们放入队列。但我不必多次处理每个节点。节点是不可变的实体。

据我了解 - 我必须使用队列和集合的一些线程安全实现(对于已经访问过的实例)。

我试图避免使用同步方法。所以,我对这个流程的实现:

  1. 当线程将节点添加到队列中时,它会检查每个节点:如果visited-nodes-set 包含该节点,则线程不将其添加到队列中。但这还不是全部

  2. 当线程从队列中获取节点时 - 它检查visited-nodes-set 是否包含该节点。如果包含,则线程从队列中获取另一个节点,直到获取尚未处理的节点。找到未处理的节点后,线程还将其添加到访问节点集中。

我尝试使用 LinkedBlockingQueue 和 ConcurrentHashMap (作为一组)。我使用了 ConcurrentHashMap,因为它包含方法 putIfAbsent(key, value) - 据我了解,这有助于原子:检查 map 是否包含键,如果不包含 - 添加它。

这是所描述算法的实现:

public class ParallelDataQueue {

   private LinkedBlockingQueue<String> dataToProcess = new LinkedBlockingQueue<String>();
   // using map as a set
   private ConcurrentHashMap<String, Object> processedData = new ConcurrentHashMap<String, Object>( 1000000 );
   private final Object value = new Object();

   public String getNextDataInstance() {
    while ( true ) {
        try {
            String data = this.dataToProcess.take();
            Boolean dataIsAlreadyProcessed = ( this.processedData.putIfAbsent( data, this.value ) != null );
            if ( dataIsAlreadyProcessed ) {
                continue;
            } else {
                return data;
            }
        } catch ( InterruptedException e ) {
            e.printStackTrace();
        }
      }
    }

    public void addData( Collection<String> data ) {
    for ( String d : data ) {
        if ( !this.processedData.containsKey( d ) ) {
            try {
                this.dataToProcess.put( d );
            } catch ( InterruptedException e ) {
                e.printStackTrace();
            }
        }
       }
     }

}

所以我的问题 - 当前的实现是否避免处理可重复的节点。而且,也许有更优雅的解决方案?

谢谢

附言

我理解,这样的实现并不能避免队列中节点的出现重复。但对我来说这并不重要——我需要的只是避免多次处理每个节点。

4

3 回答 3

0

如果您需要以多线程方式处理数据,您可能根本不需要集合。你没有考虑过使用 Executors 框架吗?:

public static void main(String[] args) throws InterruptedException {
    ExecutorService exec = Executors.newFixedThreadPool(100);
    while (true) { // provide data ininitely
        for (int i = 0; i < 1000; i++)
            exec.execute(new DataProcessor(UUID.randomUUID(), exec));
        Thread.sleep(10000); // wait a bit, then continue;
    }
}

static class DataProcessor implements Runnable {
    Object data;
    ExecutorService exec;
    public DataProcessor(Object data, ExecutorService exec) {
        this.data = data;
        this.exec = exec;
    }
    @Override
    public void run() {
        System.out.println(data); // process data
        if (new Random().nextInt(100) < 50) // add new data piece for execution if needed
            exec.execute(new DataProcessor(UUID.randomUUID(), exec));
    }

}
于 2012-06-30T10:46:04.543 回答
0

是的。使用ConcurrentLinkedQueuehttp://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

当线程将数据添加到队列中时,它会检查每个数据实例:如果 set 包含该数据的实例,则线程不将其添加到队列中。但这还不是全部

不是线程安全的方法,除非底层 Collection 是线程安全的。(这意味着它在内部同步)但是进行检查是没有意义的,因为它已经是线程安全的......

于 2012-06-30T10:05:06.740 回答
0

您当前的实现不会避免重复的数据实例。假设“线程A”检查并发映射中是否存在数据,发现不存在,则报告数据不存在。但就在 putIfAbsent 行之后执行 if 之前,“线程 A”被挂起。此时另一个威胁“线程 B”,计划由 cpu 执行并检查是否存在相同的数据元素,发现它不存在并报告为不存在,并将其添加到队列中。当线程 A 被重新调度时,它将从 if 行继续并将其再次添加到队列中。

于 2012-06-30T10:02:14.987 回答