我创建了一个 HighLevelProducer 来将消息发布到主题流,该主题流将由 ConsumerGroupStream 使用 kafka-node 使用。当我从同一个 ConsumerGroup 创建多个消费者以从同一个主题消费时,只创建一个分区并且只有一个消费者正在消费。我还尝试定义该主题的分区数,尽管我不确定在创建主题时是否需要定义它,如果需要,我需要提前多少个分区。此外,是否可以将对象推送到 Transform 流而不是字符串(我目前使用 JSON.stringify 因为否则我在消费者中得到 [Object object]。
const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
const kafkaClient = new KafkaClient({ kafkaHost });
const producer = new HighLevelProducer(kafkaClient);
const options = {
highWaterMark,
kafkaClient,
producer
};
kafkaClient.refreshMetadata([topic], err => {
if (err) throw err;
});
return new ProducerStream(options);
};
const transfrom = topic => new Transform({
objectMode: true,
decodeStrings: true,
transform(obj, encoding, cb) {
console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);
cb(null, {
topic,
messages: JSON.stringify(obj)
});
}
});
const publisher = (topic, kafkaHost, highWaterMark) => {
const myTransfrom = transfrom(topic);
const producer = myProducerStream({ kafkaHost, highWaterMark, topic });
myTransfrom.pipe(producer);
return myTransform;
};
消费者:
const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
const consumerOptions = {
kafkaHost,
groupId,
protocol: ['roundrobin'],
encoding: 'utf8',
id: uuidv4(),
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
};
const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);
consumerGroupStream.on('connect', () => {
console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
});
consumerGroupStream.on('error', (err) => {
console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
});
return consumerGroupStream;
};
const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => {
const messageTransform = new AsyncMessageTransform(func, destTopic);
const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })
consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
};