0

我正在处理一个文本数据流,我事先不知道它的值的分布是什么,但我知道每个数据看起来像这样:

{
  "datetime": "1986-11-03T08:30:00-07:00",
  "word": "wordA",
  "value": "someValue"
}

我正在尝试根据它的值将它存储到 RethinkDB 对象中,其中对象如下所示:

{
  "bucketId": "1",
  "bucketValues": {
    "wordA": [
      {"datetime": "1986-11-03T08:30:00-07:00"},
      {"datetime": "1986-11-03T08:30:00-07:00"}
    ],
    "wordB": [
      {"datetime": "1986-11-03T08:30:00-07:00"},
      {"datetime": "1986-11-03T08:30:00-07:00"}
    ]
  }
}

目的是最终计算每个桶中每个单词的出现次数。

由于我正在处理大约一百万个桶,并且提前不知道单词,因此计划是动态创建这些对象。但是,我是 RethinkDB 的新手,我已尽我所能做到这一点,即我不会尝试向word尚不存在的存储桶添加密钥,但我不完全确定我是否' m 遵循此处的最佳实践,将命令链接如下(请注意,我在 Node.js 服务器上使用以下命令运行它:

var bucketId = "someId";
var word = "someWordValue"
r.do(r.table("buckets").get(bucketId), function(result) {
  return r.branch(
    // If the bucket doesn't exist
    result.eq(null), 
    // Create it
    r.table("buckets").insert({
      "id": bucketId,
      "bucketValues" : {}
    }),
    // Else do nothing
    "Bucket already exists"
  );
})
.run()
.then(function(result) {
  console.log(result);

  r.table("buckets").get(bucketId)
  .do(function(bucket) {
    return r.branch(
      // if the word already exists
      bucket("bucketValues").keys().contains(word),
      // Just append to it (code not implemented yet)
      "Word already exists",
      // Else create the word and append it
      r.table("buckets").get(bucketId).update(
        {"bucketValues": r.object(word, [/*Put the timestamp here*/])}
      )
    );
  })
  .run()
  .then(function(result) {
    console.log(result);
  });

});

我是否需要在此处执行两次运行,或者我是否基于您应该如何正确地将事物与 RethinkDB 链接在一起?我只是想确保在深入研究之前我没有以错误/艰难的方式做这件事。

4

1 回答 1

3

你不必执行run多次,取决于你想要什么。基本上,run()结束链并将查询发送到服务器。所以我们做了所有的事情来构建查询,并run()以执行它结束它。如果你使用run()两次,这意味着它被发送到服务器 2 次。

因此,如果我们可以只使用 RethinkDB 函数完成所有处理,我们只需要调用一次 run。但是,如果我们想对某种数据进行后处理,使用客户端,那么我们别无选择。通常我尝试使用 RethinkDB 进行所有处理:使用控制结构、循环和匿名函数,我们可以走得很远,而无需让客户端执行一些逻辑。

在您的情况下,可以使用官方驱动程序使用 NodeJS 重写查询:

var r = require('rethinkdb')

var bucketId = "someId2";
var word = "someWordValue2";

r.connect()
.then((conn) => {
  r.table("buckets").insert({
        "id": bucketId,
        "bucketValues" : {}
  })
  .do((result) => {
    // We don't care about result at all
    // We just want to ensure it's there
    return r.table('buckets').get(bucketId)
      .update(function(bucket) {
        return {
          'bucketValues': r.object(
                          word,
                          bucket('bucketValues')(word).default([])
                          .append(r.now()))
        }
      })
  })
  .run(conn)
  .then((result) => { conn.close() })

})
于 2016-01-22T18:39:08.780 回答