3

API 文档在这里: http: //kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html

但是当我运行以下代码时,异常是%d 格式:需要一个数字,而不是 NoneType

    client = KafkaClient("localhost:9092")
    consumer = SimpleConsumer(client, "test-group", "test")
    consumer.seek(0, whence=None)# (0,2) and (0,0)
    run = True
    while( run ):
        message = consumer.get_message(block=False, timeout=4000)

    except Exception as e:
        print "Exception while trying to read msg:", str(e)

当我使用以下代码时,异常是seek() got an unexpected keyword argument 'partition'

consumer.seek(0, whence=None, partition=None)# (0,2) and (0,0)

任何想法?谢谢。

4

1 回答 1

0

在 Kafka Definitive Guide 中,有一个seek()用 Java 编写的示例代码(不是用 Python 编写的,但我希望您能大致了解)。

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {

         public void onPartitionsRevoked (Collection <TopicPartition> partitions) {
                  commitDBTransaction();
         }

         public void onPartitionsAssigned(Collection <TopicPartiton> partitions) {
              for(TopicPartition partition : partitions)
                  consumer.seek(partition, getOffsetFromDB(partition));
         }

     }
}   // these brackets are exactly the same as the book. I didn't change anything. You might want to though.    

   consumer.subscribe (topics, new SaveOffsetOnRebalance(consumer));
   consumer.poll(0);

   for ( TopicPartition partition : consumer.assignment())
       consumer.seek(partition, getOffsetFromDB(partition));

   while (true) {
         ConsumerRecords <String, String> records = consumer.poll(100);
         for (ConsumerRecord <String, String> record : records)
         { 
               processRecord(record);
               storeRecordInDB(record);
               storeOffsetInDB(record.topic(), record.partition(), record.offset());
         }
         commitDBTransaction();
   }
于 2019-08-23T08:42:54.547 回答