简而言之:我想以并行方式处理带有循环引用的大图。而且我无法访问完整的图表,我必须爬过它。我想组织有效的队列来做到这一点。我很感兴趣是否有任何最佳做法可以做到这一点?
我正在尝试为这种策略组织无限的数据处理流程:每个线程从队列中获取节点,处理它,在处理之后 - 可能会出现一些新的处理节点 - 所以线程必须将它们放入队列。但我不必多次处理每个节点。节点是不可变的实体。
据我了解 - 我必须使用队列和集合的一些线程安全实现(对于已经访问过的实例)。
我试图避免使用同步方法。所以,我对这个流程的实现:
当线程将节点添加到队列中时,它会检查每个节点:如果visited-nodes-set 包含该节点,则线程不将其添加到队列中。但这还不是全部
当线程从队列中获取节点时 - 它检查visited-nodes-set 是否包含该节点。如果包含,则线程从队列中获取另一个节点,直到获取尚未处理的节点。找到未处理的节点后,线程还将其添加到访问节点集中。
我尝试使用 LinkedBlockingQueue 和 ConcurrentHashMap (作为一组)。我使用了 ConcurrentHashMap,因为它包含方法 putIfAbsent(key, value) - 据我了解,这有助于原子:检查 map 是否包含键,如果不包含 - 添加它。
这是所描述算法的实现:
public class ParallelDataQueue {
private LinkedBlockingQueue<String> dataToProcess = new LinkedBlockingQueue<String>();
// using map as a set
private ConcurrentHashMap<String, Object> processedData = new ConcurrentHashMap<String, Object>( 1000000 );
private final Object value = new Object();
public String getNextDataInstance() {
while ( true ) {
try {
String data = this.dataToProcess.take();
Boolean dataIsAlreadyProcessed = ( this.processedData.putIfAbsent( data, this.value ) != null );
if ( dataIsAlreadyProcessed ) {
continue;
} else {
return data;
}
} catch ( InterruptedException e ) {
e.printStackTrace();
}
}
}
public void addData( Collection<String> data ) {
for ( String d : data ) {
if ( !this.processedData.containsKey( d ) ) {
try {
this.dataToProcess.put( d );
} catch ( InterruptedException e ) {
e.printStackTrace();
}
}
}
}
}
所以我的问题 - 当前的实现是否避免处理可重复的节点。而且,也许有更优雅的解决方案?
谢谢
附言
我理解,这样的实现并不能避免队列中节点的出现重复。但对我来说这并不重要——我需要的只是避免多次处理每个节点。