-1

我有一个要求,其中需要维护 2 个主题,其中 1 使用同步方法,其他使用异步方法。异步调用消费者记录按预期工作,但是在同步方法中,消费者代码没有被调用。

下面是配置文件中声明的代码

 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
 props.put(ProducerConfig.RETRIES_CONFIG, 3);
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 props.put(ProducerConfig.ACKS_CONFIG, "all");
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

我在这里启用了 autoFlush true

 @Bean( name="KafkaPayloadSyncTemplate")
    public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() {
        return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true);
 }

在返回 recordMetadataResults 对象后,控件将停止,不再对使用者进行任何调用

  private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws   InterruptedException, ExecutionException {      
        final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>();
        KafkaPayload kafkaPayload = constructKafkaPayload();
        ListenableFuture<SendResult<String,KafkaPayload>> 
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload);
        SendResult<String, KafkaPayload> results;
        results = future.get();
        recordMetadataResults.add(results.getRecordMetadata());     
        return recordMetadataResults;           
    }

消费者守则

public class KafkaTestListener {    
    @Autowired
    TestServiceImpl TestServiceImpl;    
    public final CountDownLatch countDownLatch = new CountDownLatch(1); 
    @KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
        countDownLatch.countDown();     
        TestServiceImpl.consumeKafkaMessage(record);        
        System.out.println("Acknowledgment : " + acknowledgment);
        acknowledgment.acknowledge();       
    }
}

基于这个问题,我有2个问题

  1. 当监听器类是同步生产者时,我们是否应该手动调用监听器类中的监听()。如果是,该怎么做?
  2. 如果 listener( @KafkaListener) 被自动调用,我需要添加哪些其他设置/配置才能使其正常工作。

感谢您提前输入

-斯里坎特

4

1 回答 1

1

您应该确保使用consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");消费者属性。

不确定您对同步/异步的含义,但生产和消费是完全不同的操作。而且您不能从生产者方面影响消费者。因为在两者之间有 Kafka Broker。

于 2017-02-21T17:45:48.970 回答