1

我正在使用 kafka 2.11 版本来编写消费者。我不断收到超时异常。我不确定我在这里使用了正确的 API

有谁能够帮我?

执行者

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

        public class MessageListener {
            private Properties properties;

            private ConsumerConnector consumerConnector;
            private String topic;
            private ExecutorService executor;

            public MessageListener(String topic) {
                this.topic = topic;

                KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
                try {
                    properties = confLoader.loadConsumerConfig();
                    ConsumerConfig consumerConfig = new ConsumerConfig(properties);
                    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }
            }

            public void start(File file) {

                Map<String, Integer> topicCountMap = new HashMap<>();
                topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));

                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                        .createMessageStreams(topicCountMap);
                List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
                executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);

                for (KafkaStream<byte[], byte[]> stream : streams) {
                    executor.submit(new ListenerThread(stream));

                }
            }


        }

线

import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
public class ListenerThread implements Runnable {
    private KafkaStream<byte[], byte[]> stream;;

    public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
        this.stream = msgStream;

    }

    @Override
    public void run() {
        try {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();

            while (it.hasNext()) {
                MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();
                String topic = messageAndMetadata.topic();
                byte[] message = messageAndMetadata.message();
                System.out.println("111111111111111111111111111");
                FileProcessor processor = new FileProcessor();
                processor.processFile(topic, message);
            }
} catch (ConsumerTimeoutException cte) {
            System.out.println("Consumer timed out");
        }

        catch (Exception ex) {
            ex.printStackTrace();
        }

    }
}
4

1 回答 1

2

您可以设置consumer.timeout.ms=-1是否不希望抛出此异常。

于 2016-03-24T14:26:30.990 回答