0
public void run() {
    // find the meta data about the topic and partition we are interested in
    PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
    if (metadata == null) {
        System.out.println("Can't find metadata for Topic and Partition. Exiting");
        return;
    }
    if (metadata.leader() == null) {
        System.out.println("Can't find Leader for Topic and Partition. Exiting");
        return;
    }
    String leadBroker = metadata.leader().host();
    String clientName = "Client_" + a_topic + "_" + a_partition;
    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
    //long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
    int numErrors = 0;
    while (a_maxReads > 0) {
        if (consumer == null) {
            consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        }
        FetchRequest req = new FetchRequestBuilder()
                .clientId(clientName)
                .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                .build();
        FetchResponse fetchResponse = consumer.fetch(req);
        if (fetchResponse.hasError()) {
            numErrors++;
            // Something went wrong!
            short code = fetchResponse.errorCode(a_topic, a_partition);
            System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
            if (numErrors > 5) break;
            if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                // We asked for an invalid offset. For simple case ask for the last element to reset
                readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                continue;
            }
            consumer.close();
            consumer = null;
            try {
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
            } catch (Exception e) {
                e.printStackTrace();
            }
            continue;
        }
        numErrors = 0;
        long numRead = 0;
        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
            long currentOffset = messageAndOffset.offset();
            if (currentOffset < readOffset) {
                System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                continue;
            }
            readOffset = messageAndOffset.nextOffset();
            ByteBuffer payload = messageAndOffset.message().payload();

            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            try {
                dataPoints.add(simpleAPIConsumer.parse(simpleAPIConsumer.deserializing(bytes)));//add data to List
            } catch (Exception e) {
                e.printStackTrace();
            }
            numRead++;
            a_maxReads--;
        }
        if (numRead == 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ie) {
            }
        }
    }
    simpleAPIConsumer.dataHandle(dataPoints);//Handel Data
    if (consumer != null) consumer.close();
}

我在 Kafka 源代码中找到了这种方法。我应该使用它吗?

/**
 * Commit offsets for a topic to Zookeeper
 * @param request a [[kafka.javaapi.OffsetCommitRequest]] object.
 * @return a [[kafka.javaapi.OffsetCommitResponse]] object.
 */
 def commitOffsets(request: kafka.javaapi.OffsetCommitRequest):kafka.javaapi.OffsetCommitResponse = {
   import kafka.javaapi.Implicits._
   underlying.commitOffsets(request.underlying)
 }
4

1 回答 1

-1

每次获取后提交偏移量的目的是实现一次性消息处理。
您需要确保在处理完消息后提交偏移量(其中“处理”表示从 Kafka 中提取消息后对消息执行的任何操作)。就像您将消息处理和偏移提交包装到事务中一样,其中要么成功要么失败。
这样,如果您的客户端崩溃,您将能够在重新启动后从正确的偏移量开始。

于 2015-11-08T22:02:16.303 回答