1

我有一个具有以下骨架的卡夫卡消费者:

         private static ConsumerConfig createConsumerConfig(String connStr)
          {  
             Properties externalConsumerProperties = ConsumerProperties.getConsumerProperties();

             Properties internalConsumerProperties = new Properties();

             // Properties below must not be changeable externally
             internalConsumerProperties.put("zookeeper.connect", connStr);
             internalConsumerProperties.put("group.id", "sm-publisher");
             internalConsumerProperties.put("zookeeper.session.timeout.ms", "400");
             internalConsumerProperties.put("zookeeper.sync.time.ms", "200");
             internalConsumerProperties.put("auto.commit.enable", "false");
             internalConsumerProperties.put("consumer.timeout.ms", "15000");
             internalConsumerProperties.put("auto.offset.reset", "smallest");

             Properties props = new Properties();

             if(externalConsumerProperties != null)
             {  
                props.putAll(externalConsumerProperties);
             }

             props.putAll(internalConsumerProperties);

             return new ConsumerConfig(props);
          }

    main()
   {
        ConsumerConnector connector = null;  
        connector=kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(connectionString));
        kafkaStream = createStream(connector);
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        while(it.hasNext())
        {
            //process message 
            // ...............
            connector.commitOffsets();
        }
   }

这个消费者在正常情况下工作得很好。但是在获取消息之后并且在提交偏移量之前,如果与 Zookeeper 服务器的连接中断,并且消费者尝试提交偏移量,那么它就会陷入无限循环。有没有办法让它出来??

4

0 回答 0