3

使用 cassandra 和流星。

let client = new cassandra.Client({contactPoints: [cassandraHost]});

var cassandraExecSync = Meteor.wrapAsync(client.execute, client);
MyProject.Feed.CassandraMeteorWrap = {
insertNewPost: function (userId, postContentJson, relevance) {
       var insertCommand = insert(userId, postContentJson, relevance);
       try {
           return cassandraExecSync(insertCommand);
       } catch (err) {
           console.log("error inserting: " + insertCommand);
           console.log(err);
       }
   }

所以我用 Meteor.wrapAsync 包装了 Cassandra.client.execute (最后一个 arg 作为回调)

前几个插入工作,但在几个插入后(插入被定期调用)我得到:

[错误:Meteor 代码必须始终在 Fiber 中运行。尝试使用 Meteor.bindEnvironment 包装传递给非 Meteor 库的回调。]

更新:调试流星显示堆栈跟踪,异常从我在.onTimeout()上使用“cassandra-driver”的npm包开始:

function listOnTimeout() {
  var msecs = this.msecs;
  var list = this;

  debug('timeout callback ' + msecs);

  var now = Timer.now();
  debug('now: %d', now);

  var first;
  while (first = L.peek(list)) {
    // If the previous iteration caused a timer to be added,
    // update the value of "now" so that timing computations are
    // done correctly. See test/simple/test-timers-blocking-callback.js
    // for more information.
    if (now < first._monotonicStartTime) {
      now = Timer.now();
      debug('now: %d', now);
    }

    var diff = now - first._monotonicStartTime;
    if (diff < msecs) {
      list.start(msecs - diff, 0);
      debug(msecs + ' list wait because diff is ' + diff);
      return;
    } else {
      L.remove(first);
      assert(first !== L.peek(list));

      if (!first._onTimeout) continue;

      // v0.4 compatibility: if the timer callback throws and the
      // domain or uncaughtException handler ignore the exception,
      // other timers that expire on this tick should still run.
      //
      // https://github.com/joyent/node/issues/2631
      var domain = first.domain;
      if (domain && domain._disposed) continue;
      try {
        if (domain)
          domain.enter();
        var threw = true;
        **first._onTimeout();**
        if (domain)
          domain.exit();
        threw = false;
      } finally {
        if (threw) {
          // We need to continue processing after domain error handling
          // is complete, but not by using whatever domain was left over
          // when the timeout threw its exception.
          var oldDomain = process.domain;
          process.domain = null;
          process.nextTick(function() {
            list.ontimeout();
          });
          process.domain = oldDomain;
        }
      }
    }
  }

  debug(msecs + ' list empty');
  assert(L.isEmpty(list));
  list.close();
  delete lists[msecs];
}
4

0 回答 0