我有一个Observable<String>
. 我想把它变成一个Map<String, Int>
告诉我每个不同字符串的出现次数。
observable 包含约 10 亿个元素,其中 1000 个是不同的(因此将整个数据集存储在 RAM 中不是一种选择)。目前我迭代Observable
并更新了一个HashMap
. 我还确保在同一个线程上进行观察以避免竞争条件。但是,获取元素频率本质上应该很容易并行化,因此利用它会很好。
有没有办法做到这一点?
我有一个Observable<String>
. 我想把它变成一个Map<String, Int>
告诉我每个不同字符串的出现次数。
observable 包含约 10 亿个元素,其中 1000 个是不同的(因此将整个数据集存储在 RAM 中不是一种选择)。目前我迭代Observable
并更新了一个HashMap
. 我还确保在同一个线程上进行观察以避免竞争条件。但是,获取元素频率本质上应该很容易并行化,因此利用它会很好。
有没有办法做到这一点?
您可以使用groupBy
而不是自己维护HashMap
。groupBy
将为每个密钥创建一个Observable
,您可以在不同的调度程序上订阅它。例如,
public class KeyCounter {
int key;
long count;
public KeyCounter(int key, long count) {
this.key = key;
this.count = count;
}
@Override
public String toString() {
return "key: " + key + " count: " + count;
}
}
@Test
public void foo() {
Observable<Integer> o = Observable.just(1, 2, 3, 2, 1);
o.groupBy(i -> i).flatMap(
group ->
group.subscribeOn(Schedulers.computation()).countLong().map(count -> new KeyCounter(group.getKey(), count))
).subscribe(System.out::println);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}