5

我有一个List<String>被调用lines的和一个巨大的(~3G)Set<String>被调用voc的。我需要从中找到所有linesvoc。我可以用这种多线程方式吗?

目前我有这个简单的代码:

for(String line: lines) {
  if (voc.contains(line)) {
    // Great!!
  }
}

有没有办法同时搜索几行?可能有现有的解决方案吗?

PS:我正在使用javolution.util.FastMap,因为它在填充过程中表现更好。

4

4 回答 4

2

这是一个可能的实现。请注意,错误/中断处理已被省略,但这可能会给您一个起点。我包含了一个 main 方法,因此您可以将其复制并粘贴到您的 IDE 中以进行快速演示。

编辑:清理了一些东西以提高可读性和列表分区

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelizeListSearch {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<String> searchList = new ArrayList<String>(7);
        searchList.add("hello");
        searchList.add("world");
        searchList.add("java");
        searchList.add("debian");
        searchList.add("linux");
        searchList.add("jsr-166");
        searchList.add("stack");

        Set<String> targetSet = new HashSet<String>(searchList);

        Set<String> matchSet = findMatches(searchList, targetSet);
        System.out.println("Found " + matchSet.size() + " matches");
        for(String match : matchSet){
            System.out.println("match:  " + match);
        }
    }

    public static Set<String> findMatches(List<String> searchList, Set<String> targetSet) throws InterruptedException, ExecutionException {
        Set<String> locatedMatchSet = new HashSet<String>();

        int threadCount = Runtime.getRuntime().availableProcessors();   

        List<List<String>> partitionList = getChunkList(searchList, threadCount);

        if(partitionList.size() == 1){
            //if we only have one "chunk" then don't bother with a thread-pool
            locatedMatchSet = new ListSearcher(searchList, targetSet).call();
        }else{  
            ExecutorService executor = Executors.newFixedThreadPool(threadCount);
            CompletionService<Set<String>> completionService = new ExecutorCompletionService<Set<String>>(executor);

            for(List<String> chunkList : partitionList)
                completionService.submit(new ListSearcher(chunkList, targetSet));

            for(int x = 0; x < partitionList.size(); x++){
                Set<String> threadMatchSet = completionService.take().get();
                locatedMatchSet.addAll(threadMatchSet);
            }

            executor.shutdown();
        }


        return locatedMatchSet;
    }

    private static class ListSearcher implements Callable<Set<String>> {

        private final List<String> searchList;
        private final Set<String> targetSet;
        private final Set<String> matchSet = new HashSet<String>();

        public ListSearcher(List<String> searchList, Set<String> targetSet) {
            this.searchList = searchList;
            this.targetSet = targetSet;
        }

        @Override
        public Set<String> call() {
            for(String searchValue : searchList){
                if(targetSet.contains(searchValue))
                    matchSet.add(searchValue);
            }

            return matchSet;
        }

    }

    private static <T> List<List<T>> getChunkList(List<T> unpartitionedList, int splitCount) {
        int totalProblemSize = unpartitionedList.size();
        int chunkSize = (int) Math.ceil((double) totalProblemSize / splitCount);

        List<List<T>> chunkList = new ArrayList<List<T>>(splitCount);

        int offset = 0;
        int limit = 0;
        for(int x = 0; x < splitCount; x++){
            limit = offset + chunkSize;
            if(limit > totalProblemSize)
                limit = totalProblemSize;
            List<T> subList = unpartitionedList.subList(offset, limit);
            chunkList.add(subList);
            offset = limit;
        }

        return chunkList;
    }

}
于 2013-01-27T23:20:07.443 回答
1

绝对有可能使用多个线程并行化它。您可以执行以下操作:

  1. 将列表分解为不同的“块”,每个执行搜索的线程一个。
  2. 让每个线程查看它的块,检查每个字符串是否在集合中,如果是,则将字符串添加到结果集中。

例如,您可能有以下线程例程:

public void scanAndAdd(List<String> allStrings, Set<String> toCheck,
                       ConcurrentSet<String> matches, int start, int end) {
    for (int i = start; i < end; i++) {
        if (toCheck.contains(allStrings.get(i))) {
            matches.add(allStrings.get(i));
        }
    }
}

然后,您可以根据需要生成尽可能多的线程来运行上述方法并等待所有线程完成。然后将生成的匹配存储在matches.

为简单起见,我将输出设置为 a ConcurrentSet,它会自动消除由于写入导致的竞争条件。allStrings由于您只对要检查的字符串列表和字符串集进行读取,因此在读取或执行查找时不需要同步toCheck

希望这可以帮助!

于 2013-01-27T21:10:52.137 回答
1

如果您正在寻找这个,只需在不同线程之间拆分(至少在 Oracle JVM 中)会将工作分散到所有 CPU 中。我喜欢使用 CyclicBarrier,可以更轻松地控制这些线程。

http://javarevisited.blogspot.cz/2012/07/cyclicbarrier-example-java-5-concurrency-tutorial.html

于 2013-01-27T21:11:10.807 回答
0

另一种选择是使用Akka,它非常简单地完成了这些事情。

实际上,在使用 Akka 进行了一些搜索工作之后,我也可以告诉您的其中一件事是它支持两种并行化此类事物的方式:通过可组合期货或代理。对于您想要的,可组合期货就足够了。然后,Akka 实际上并没有增加那么多:Netty 提供了大规模并行的 io 基础设施,Futures 是 jdk 的一部分,但是 Akka 确实使将这两者放在一起并在需要时/如果需要时扩展它们变得非常简单。

于 2013-01-27T23:24:23.227 回答