我正在使用 node-rdkafka ( https://github.com/Blizzard/node-rdkafka ) 从 IBM Bluemix Message Hub 消费和生成消息。只要我每个节点进程最多订阅 2 个主题,我就可以毫无问题地使用和生成消息。一旦我订阅了三个或更多主题,我的消费者就不再收到关于任何订阅主题的任何消息。我没有看到任何错误。
这里有软限制吗?或者我的代码中有什么东西导致了这个问题?增加服务器内存似乎没有任何效果。
生产者代码:
events.send = events.produce = (topic, type, data) => {
log.info('Sending message on topic ' + topic);
let producer = lib.getProducer(hubConfig);
// Connect to the broker manually
producer.connect({}, (err) => {
if (err) {
log.error('Producer failed to connect');
log.error(err);
}
});
// Wait for the ready event before proceeding
producer.on('ready', () => {
log.info('Producer ready, sending message');
try {
producer.produce(
topic,
null,
new Buffer(JSON.stringify(data)),
type,
Date.now()
);
} catch (err) {
log.error('A problem occurred when sending our message');
log.error(err);
}
});
producer.on('event.error', (err) => {
log.error('Error from producer');
log.error(err);
})
};
lib.getProducer = (hubConfig) => {
return new Kafka.Producer({
'metadata.broker.list': hubConfig.kafka_brokers_sasl.join(','),
'security.protocol': 'sasl_ssl',
'ssl.ca.location': '/etc/ssl/certs',
'sasl.mechanisms': 'PLAIN',
'sasl.username': hubConfig.user,
'sasl.password': hubConfig.password,
'api.version.request': true,
'dr_cb': true,
'event_cb': true
});
};
消费者:
events.listen = events.consume = (topics, callback) => {
if (!_.isArray(topics)) {
topics = [topics];
}
log.info('Subscribing to ' + topics.join(', ') + ' on test event listener...');
let consumer,
emitter = new evt.EventEmitter(),
// Each consumer has a unique group and client ID
groupName = 'group-' + uuidv1(),
clientName = 'client-' + uuidv1();
consumer = lib.getConsumer(hubConfig, groupName, clientName);
consumer.connect({}, (err) => {
if (err) {
log.error('Consumer failed to connect');
log.error(err);
if (callback) callback(err);
}
});
consumer
.on('ready', function() {
log.info('Consumer connected, subscribed to ' + topics.join(', '));
consumer.subscribe(topics);
consumer.consume();
if (callback) callback();
})
.on('data', function(data) {
let d = data.value.toString().replace(/"/g,''),
dupeKey = d + '-' + data.key;
if (!duplicateBuffer[dupeKey]) {
emitter.emit('message', {
data: d,
type: data.key,
topic: data.topic
});
duplicateBuffer[dupeKey] = setTimeout(() => {
delete duplicateBuffer[dupeKey];
}, DUPE_DELAY);
} else {
log.info('Ignoring duplicate event: ' + d + ' ' + data.type);
}
})
.on('error', (err) => {
log.error(err);
emitter.emit('error', err);
});
return emitter;
};
lib.getConsumer = (hubConfig, groupName, clientName) => {
return new Kafka.KafkaConsumer({
'group.id': groupName,
'client.id': clientName,
'metadata.broker.list': hubConfig.kafka_brokers_sasl.join(','),
'security.protocol': 'sasl_ssl',
'ssl.ca.location': '/etc/ssl/certs',
'sasl.mechanisms': 'PLAIN',
'sasl.username': hubConfig.user,
'sasl.password': hubConfig.password,
'api.version.request': true,
'event_cb': true
}, {});
};
有什么建议么?