1

在我的示例程序中,我尝试发布一个文件并尝试立即使用它。但我的消费者迭代器返回 null。知道我做错了什么吗?

测试

**main(){**

KafkaMessageProducer producer = new KafkaMessageProducer(topic, file);
        producer.generateMessgaes();

        MessageListener listener = new MessageListener(topic);
        listener.start();
}

消息监听器

public void start() {



        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);

        for (KafkaStream<byte[], byte[]> stream : streams) {
            System.out.println("The stream is --"+ stream.iterator().makeNext().topic());
            executor.submit(new ListenerThread(stream));    
        }
        try { // without this wait the subsequent shutdown happens immediately before any messages are delivered
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        if (consumerConnector != null) {
            consumerConnector.shutdown();
        }
        if (executor != null) {
            executor.shutdown();
        }
    }

监听线程

   public class ListenerThread implements Runnable {
        private KafkaStream<byte[], byte[]> stream;

        public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
            this.stream = msgStream;
            System.out.println("----------" + stream.iterator().makeNext().topic());
        }

public void run() {
        try {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                // MessageAndMetadata<byte[], byte[]> messageAndMetadata =
                // it.makeNext();
                // String topic = messageAndMetadata.topic();
                // byte[] message = messageAndMetadata.message();
                System.out.println("111111111111111111111111111");
                FileProcessor processor = new FileProcessor();
                processor.processFile("LOB_TOPIC", it.next().message());
            }

在上面的迭代器中,它不会进入 while 循环,因为迭代器是空的。但我确定我正在向同一个主题发布一条消息,并且消费者会收听该主题。

任何帮助,将不胜感激

4

1 回答 1

1

我昨天也有同样的问题。在尝试使用它一段时间后,我无法从我当前的主题中读取它。所以我采取了以下步骤

一个。停止了我的消费者,

湾。停止了生产者,

C。停止 kafka 服务器 bin/zookeeper-server-stop.sh config/zookeeper.properties

d。停止了 zookeeper bin/zookeeper-server-stop.sh config/zookeeper.properties

之后我删除了我的话题。bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

我还删除了按照“设置多代理集群”创建的文件,但我认为它不会造成问题。

一个。启动 Zookeeper b. 开始了 kafka c. 启动生产者并向 Kafka 发送一些消息

它又开始工作了。我不确定这是否会对您有所帮助。但似乎不知何故,我的制片人一定与消费者脱节了。希望这可以帮助。

于 2016-03-30T16:13:35.520 回答