所以我的问题可能涉及基于应用程序性质的一些头脑风暴。
我有一个向 Kafka 发送消息的 Node JS 应用程序。例如,每次用户单击页面时,Kafka 应用程序都会根据访问情况运行计算。然后,我想在通过我的 Kafka 消息触发它之后检索这个计算。到目前为止,此计算存储在 Cassandra 数据库中。问题是,如果我们在计算完成之前尝试从 Cassandra 读取数据,那么我们将不会从数据库中查询任何内容(尚未插入密钥)并且不会返回任何内容(错误),或者计算可能是陈旧的。到目前为止,这是我的代码。
router.get('/:slug', async (req, res) =>{
Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage
client = new kafka.KafkaClient()
producer = new Producer(client)
km = new KeyedMessage('key', 'message')
kafka_message = JSON.stringify({ id: req.session.session_id.toString(), url: arbitrary_url })
payloads = [
{ topic: 'MakeComputationTopic', messages: kafka_message}
];
const clientCass = new cassandra.Client({
contactPoints: ['127.0.0.1:9042'],
localDataCenter: 'datacenter1', // here is the change required
keyspace: 'computation_space',
authProvider: new auth.PlainTextAuthProvider('cassandra', 'cassandra')
});
const query = 'SELECT * FROM computation WHERE id = ?';
clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
.then(result => console.log('User with email %s', result.rows[0].computations))
.catch((message) => {
console.log('Could not find key')
});
}
首先,我想到了 async 和 await ,但这被排除在外,因为这不会停止过时的计算。
其次,我考虑让我的应用程序休眠,但似乎这种方式会减慢我的应用程序速度。
我可能决定使用 Kafka Consumer(在我的 node-js 中)来使用一条消息,表明现在查看 Cassandra 表是安全的。
例如(使用 kafka-node)
consumer.on('message', function (message) {
clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
.then(result => console.log('User with computation%s', result.rows[0].computations))
.catch((message) => {
console.log('Could not find key')
});
});
这种方法虽然更好,但似乎有点不对劲,因为每次用户点击页面时我都必须让消费者成为消费者,而且我只关心它被发送 1 条消息。
我想知道我应该如何应对这个挑战?我是否可能错过了一个场景,或者有没有办法使用 kafka-node 来解决这个问题?我也在考虑做一个等待承诺成功并且计算不会过时的while循环(比较缓存中的值)