我有一个 php 程序,它有 2 个消耗 kafka 消息的 php 进程。但有时它会显示诸如“Kafka 消费者组授权失败”之类的错误。2个进程之间有什么关系吗?它们具有相同的主题和组 ID,当然 ACL 策略类型是 GSSAPI。但我真的不知道为什么它只是偶尔发生?
这是代码:
public static function consumer(string $topic, string $group, $handler, string $instance = 'test')
{
$conf = new Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
$kafka->assign(null);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', $group);
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', config("kafka.$instance"));
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$conf->set('auto.offset.reset', 'smallest');
// kafka version: 0.10.0
// %5|1583908574.751|MAXPOLL|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/102: Broker does not support KIP-62 (requires Apache Kafka >= v0.10.1.0): consumer configuration `max.poll.interval.ms` (300000) is effectively limited by `session.timeout.ms` (10000) with this broker version
// $conf->set('max.poll.interval.ms', '10000');
$consumer = new KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe([$topic]);
// "Waiting for partition assignment... (make take some time when\n";
// "quickly re-joining the group after leaving it.)\n";
while (true) {
/** @var \RdKafka\Message $message */
$message = $consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$handler($message->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
}