2

我试图强制 Apache Ignite 1.5.0.final 使用单个节点上可用的所有 CPU 能力来并行处理本地缓存数据,但是我可以清楚地看到它没有使用所有可用的内核。

缓存创建如下:

    CacheConfiguration<Integer, MyObject> cfg = new CacheConfiguration<Integer, MyObject>();
    cfg.setSwapEnabled(false);
    cfg.setName(CACHE_NAME);
    cfg.setCacheMode(CacheMode.PARTITIONED);
    cfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
    cfg.setBackups(0);
    cfg.setCopyOnRead(false);
    this.cache = ignite.getOrCreateCache(CACHE_NAME);

CPU 使用率看起来只有一个线程在做这项工作。当我将实现切换到 ArrayList - 不使用 Ignite 时,CPU 使用率达到 400%。

这段代码用于过滤缓存:

            IgniteCache<Integer, MyObject> cache = ignite.getOrCreateCache(CACHE_NAME);
            Spliterator<Entry<Integer, MyObject>> split = cache.localEntries().spliterator();
            Stream<MyObject> stream = StreamSupport.stream(split, true).map( entry -> entry.getValue());    
            aggregate.setCount(stream.filter(new SomePredicate()).count());

使用 Ignite 运行时进行了一些分析,发现一次只有一个 Runnable 线程,而使用 ArrayList 则有 3 或 4 个线程在工作。

非常感谢帮助,

巴特

4

1 回答 1

0

localEntries返回内部 Ignite 数据结构的迭代器,因此我认为在没有额外支持的情况下使用 Java 流 API 将其拆分是不可能的。ScanQuery但是您可以通过使用which 允许迭代特定分区来并行化任务。获取本地分区号列表,为每个分区创建单独的查询并在单独的线程中执行查询。代码将如下所示:

// Get list of local partitions.
int[] partitions = ignite.affinity(CACHE_NAME).allPartitions(ignite.cluster().localNode());

// For each partition submit a task to a thread pool.
for (final int partition : partitions) {
    executor.submit(new Runnable() {
        @Override public void run() {
            ScanQuery<Integer, MyObject> query = new ScanQuery<>();

            query.setPartition(partition);
            query.setLocal(true);

            QueryCursor<Cache.Entry<Integer, MyObject>> cursor = cache.query(query);

            for (Cache.Entry<Integer, MyObject> entry : cursor) {
                // ...
            }
        }
    });
}
于 2016-04-30T00:08:01.397 回答