0

所以我的问题可能涉及基于应用程序性质的一些头脑风暴。

我有一个向 Kafka 发送消息的 Node JS 应用程序。例如,每次用户单击页面时,Kafka 应用程序都会根据访问情况运行计算。然后,我想在通过我的 Kafka 消息触发它之后检索这个计算。到目前为止,此计算存储在 Cassandra 数据库中。问题是,如果我们在计算完成之前尝试从 Cassandra 读取数据,那么我们将不会从数据库中查询任何内容(尚未插入密钥)并且不会返回任何内容(错误),或者计算可能是陈旧的。到目前为止,这是我的代码。

router.get('/:slug', async (req, res) =>{

Producer = kafka.Producer


KeyedMessage = kafka.KeyedMessage
  client = new kafka.KafkaClient()



producer = new Producer(client)



km = new KeyedMessage('key', 'message')
  kafka_message = JSON.stringify({ id: req.session.session_id.toString(), url: arbitrary_url })
  payloads = [
    { topic: 'MakeComputationTopic', messages: kafka_message}
  ]; 
const clientCass = new cassandra.Client({
contactPoints: ['127.0.0.1:9042'],
localDataCenter: 'datacenter1', // here is the change required
keyspace: 'computation_space',
authProvider: new auth.PlainTextAuthProvider('cassandra', 'cassandra')
});



const query = 'SELECT * FROM computation  WHERE id = ?';




clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with email %s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });


}

首先,我想到了 async 和 await ,但这被排除在外,因为这不会停止过时的计算。

其次,我考虑让我的应用程序休眠,但似乎这种方式会减慢我的应用程序速度。

我可能决定使用 Kafka Consumer(在我的 node-js 中)来使用一条消息,表明现在查看 Cassandra 表是安全的。

例如(使用 kafka-node)

consumer.on('message', function (message) {
    clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with computation%s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });
}); 

这种方法虽然更好,但似乎有点不对劲,因为每次用户点击页面时我都必须让消费者成为消费者,而且我只关心它被发送 1 条消息。

我想知道我应该如何应对这个挑战?我是否可能错过了一个场景,或者有没有办法使用 kafka-node 来解决这个问题?我也在考虑做一个等待承诺成功并且计算不会过时的while循环(比较缓存中的值)

4

1 回答 1

1

这种方法虽然更好,但似乎有点不对劲,因为每次用户点击页面时我都必须让消费者成为消费者,而且我只关心它被发送 1 条消息。

我会得出同样的结论。Cassandra 不是为这类用例而设计的。数据库最终是一致的。如果您一起破解某些东西,您当前的方法目前可能有效,但是一旦您拥有 Cassandra 集群,肯定会导致未定义的行为。特别是当您更新条目时。

计算表中的 id 是您的分区键。这意味着一旦你有了一个集群,Cassandra 就会通过 id 分发数据。看起来它只包含一行。这是对 Cassandra 表进行建模的一种非常低效的方法。

您的用例看起来像是用于会话存储或缓存的用例。RedisLevelDB非常适合这类用例。任何其他键值存储也可以完成这项工作。

为什么不将结果写入另一个主题,并让另一个应用程序读取该主题并将结果写入数据库。这样您就不需要保留任何状态。完成后,结果将在主题中。它看起来像这样:

传入数据->第一个kafka主题->计算应用程序->第二个kafka主题->另一个将其写入数据库的应用程序<-另一个定期读取数据的应用程序。

如果它在那里,它就在那里,因此还没有完成。

于 2020-08-28T21:57:01.340 回答