我一直在研究 kafkajs 和 socket.io 我对它很陌生,我似乎无法理解一些事情。我创建了一个聊天应用程序,基本上通过打开浏览器(客户端),您可以键入消息并将它们显示在聊天窗口中。我找到了一个让 kafka 打印“this message + i”的教程。我不想发送到主题并打印 message+i 来打印人们在聊天中输入的内容,但我不确定我应该如何做到这一点。
这是我的 consumer.js:
const { Kafka } = require("kafkajs")
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"
const kafka = new Kafka({ clientId, brokers })
// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: clientId })
const consume = async () => {
// first, we wait for the client to connect and subscribe to the given topic
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ message }) => {
// here, we just log the message to the standard output
console.log(`received message: ${message.value}`)
},
})
}
module.exports = consume
这是我的 producer.js:
// import the `Kafka` instance from the kafkajs library
const { Kafka } = require("kafkajs")
// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers })
const producer = kafka.producer()
// we define an async function that writes a new message each second
const produce = async () => {
await producer.connect()
let i = 0
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
await producer.send({
topic,
messages: [
{
key: String(i),
value: "this is message " + i,
},
],
})
// if the message is written successfully, log it and increment `i`
console.log("writes: ", i)
i++
} catch (err) {
console.error("could not write message " + err)
}
}, 1000)
}
module.exports = produce
我知道我应该以某种方式将主题代理和客户端与 socket.io 连接,但我不确定如何。
这是我的chat.js:
/* Kane connection sto server opos prin
exw tin ikanotita na xrhsimopoihsw to io logo tou library pou phra apo to documentation*/
var socket = io.connect('http://localhost:8000');
// linking Variables toy indexhtml
var message = document.getElementById('message');
var username = document.getElementById('username');
var btn = document.getElementById('send');
var output = document.getElementById('output');
var feedback = document.getElementById('feedback');
// Stelnw events pou ginonte apo ton xristi kai stelnonte ston server
btn.addEventListener('click', function(){
socket.emit('chat', {
message: message.value,
username: username.value
});
message.value = "";
});
message.addEventListener('keypress', function(){
socket.emit('typing', username.value);
})
// Events wste na perimenw to data apo ton server
socket.on('chat', function(data){
feedback.innerHTML = '';
output.innerHTML += '<p><strong>' + data.username + ': </strong>' + data.message + '</p>';
});
socket.on('typing', function(data){
feedback.innerHTML = '<p><em>' + data + ' is typing a message...</em></p>';
});