1

我有一个 php 程序,它有 2 个消耗 kafka 消息的 php 进程。但有时它会显示诸如“Kafka 消费者组授权失败”之类的错误。2个进程之间有什么关系吗?它们具有相同的主题和组 ID,当然 ACL 策略类型是 GSSAPI。但我真的不知道为什么它只是偶尔发生?

这是代码:

    public static function consumer(string $topic, string $group, $handler, string $instance = 'test')
    {
        $conf = new Conf();

        // Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {

            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    $kafka->assign(null);
                    break;

                default:
                    throw new \Exception($err);
            }
        });

        // Configure the group.id. All consumer with the same group.id will consume
        // different partitions.
        $conf->set('group.id', $group);

        // Initial list of Kafka brokers
        $conf->set('metadata.broker.list', config("kafka.$instance"));


        // Set where to start consuming messages when there is no initial offset in
        // offset store or the desired offset is out of range.
        // 'smallest': start from the beginning
        $conf->set('auto.offset.reset', 'smallest');

        // kafka version: 0.10.0
        // %5|1583908574.751|MAXPOLL|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/102: Broker does not support KIP-62 (requires Apache Kafka >= v0.10.1.0): consumer configuration `max.poll.interval.ms` (300000) is effectively limited by `session.timeout.ms` (10000) with this broker version
        // $conf->set('max.poll.interval.ms', '10000');

        $consumer = new KafkaConsumer($conf);
        // Subscribe to topic 'test'
        $consumer->subscribe([$topic]);

        // "Waiting for partition assignment... (make take some time when\n";
        // "quickly re-joining the group after leaving it.)\n";

        while (true) {
            /** @var \RdKafka\Message $message */
            $message = $consumer->consume(120 * 1000); 
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    $handler($message->payload);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    // "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    // "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
4

0 回答 0