这是一种可能的解决方案。我假设您有要访问的键值存储(在您的情况下为 RocksDB)的客户端库。
KeyValuePair
表示一个 bean 类,表示您的键值存储中的一个键值对。
课程
/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
public KeyValueIterator() {
//TODO initialize your custom reader using java client library
}
@Override
public boolean hasNext() {
//TODO
}
@Override
public KeyValuePair next() {
//TODO
}
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
@Override
public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
//ignore empty 'keyValuePair' object
return new KeyValueIterator();
}
}
创建 KeyValue RDD
/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());
笔记:
上述解决方案默认创建一个具有两个分区的 RDD(其中一个为空)。在应用任何转换之前增加分区keyValuePairRDD
以在执行程序之间分配处理。增加分区的不同方法:
keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)