0

目前,elasticsearch hadoop 正在将数据集/rdd 转换为具有 1 对 1 映射的文档,即将数据集中的 1 行转换为一个文档。在我们的场景中,我们正在做这样的事情

对于'uni

PUT spark/docs/1
{
"_k":"one",
"_k":"two",
"_k":"three" // large sets , we dont need to store much, we just want to map multiple keys to single value.
"_v" :"key:
}

GET spark/docs/_search
{
"query" : {
  "constant_score" : {
    "filter" : {
      "terms" : {
        "_k" : ["one"] // all values work.
        }
      }
    }
  }
}

任何建议我们如何在上面实施,如果有更好的策略,请提出建议。

下面的代码不起作用,但我试图在理论上实现如下所示

  final Dataset<String> df = spark.read().csv("src/main/resources/star2000.csv").select("_c1").dropDuplicates().as(Encoders.STRING());
  final Dataset<ArrayList> arrayListDataset = df.mapPartitions(new MapPartitionsFunction<String, ArrayList>() {
        @Override
        public Iterator<ArrayList> call(Iterator<String> iterator) throws Exception {
            ArrayList<String> s = new ArrayList<>();
            iterator.forEachRemaining(it -> s.add(it));
            return Iterators.singletonIterator(s);
        }
    }, Encoders.javaSerialization(ArrayList.class));
  JavaEsSparkSQL.saveToEs(arrayListDataset,"spark/docs");

我不想在一个列表中收集完整的数据集,因为它可能导致 OOM,因此计划是获取每个分区的列表并将其索引到分区键。

4

2 回答 2

0

使用 pojo 作为

Document{
   String[] vals,
   String key
} 

并使用以下代码片段

Dataset<String> df = spark.sqlContext().read().parquet(params.getPath())
                        .select(params.getColumnName())
                        .as(Encoders.STRING());

final Dataset<Document> documents = df.coalesce(numPartitions).mapPartitions(iterator -> {
       final Set<String> set = Sets.newHashSet(iterator);
       Document d = new Document(set.toArray(new String[set.size()]),"key1");
       return Iterators.singletonIterator(d);}, Encoders.bean(Document.class));
JavaEsSparkSQL.saveToEs(documents, params.getTableIndexName() + "/"+params.getTableIndexType());

这会在数组索引之上创建。

于 2017-01-06T12:22:24.273 回答
0

发布您正在使用的一些源代码会有所帮助,这个问题也不清楚您想要实现的目标。

我假设您想向键字段 (_k) 发布一个数组,向值字段 (_v) 发布一个不同的值?

因此,您可以创建一个 JavaPairRDD 并将其保存到 Elasticsearch,如下所示:

String[] keys = {"one", "two", "three"};
String value = "key";

List<Tuple2<String[],String>> l = new ArrayList<Tuple2<String[],String>>();
l.add(new Tuple2<String[],String>(keys, value));

JavaPairRDD<String[],String> R = ctx.parallelizePairs(l);

JavaEsSpark.saveToEs(R,"index/type");
于 2016-12-28T11:04:20.810 回答