0

当我将 AggregatingReplyingKafkaTemplate 与 template.setReturnPartialOnTimeout(true) 一起使用时,似乎存在一个问题,即使消费者可以获得部分结果,它也会返回超时异常。

在下面的示例中,我有 3 个消费者来回复请求主题,并且我将回复超时设置为 10 秒。我已经明确地将消费者 3 的响应延迟到 11 秒,但是,我希望消费者 1 和 2 的响应能够返回,所以我可以返回部分结果。但是,我收到了 KafkaReplyTimeoutException。感谢您的投入。谢谢。

我根据下面的单元测试遵循代码。[回复KafkaTemplateTests][1]

我在下面提供了实际代码:


@RestController
public class SumController {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    public static final String D_REPLY = "dReply";

    public static final String D_REQUEST = "dRequest";

    @ResponseBody
    @PostMapping(value="/sum")
    public String sum(@RequestParam("message") String message) throws InterruptedException, ExecutionException {

        AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
                new TopicPartitionOffset(D_REPLY, 0), 3, new AtomicInteger());
        String resultValue ="";
        String currentValue ="";

        try {
            template.setDefaultReplyTimeout(Duration.ofSeconds(10));
            template.setReturnPartialOnTimeout(true);

            ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, message);

            RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
                    template.sendAndReceive(record);

            future.getSendFuture().get(5, TimeUnit.SECONDS); // send ok
            System.out.println("Send Completed Successfully");

            ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord = future.get(10, TimeUnit.SECONDS);
            System.out.println("Consumer record size "+consumerRecord.value().size());

            Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();

            while (iterator.hasNext()) {
                currentValue = iterator.next().value();
                System.out.println("response " + currentValue);
                System.out.println("Record header " + consumerRecord.headers().toString());
                resultValue = resultValue + currentValue + "\r\n";
            }


        } catch (Exception e) {
            System.out.println("Error Message is "+e.getMessage());
        } 

        return resultValue;

    }

    public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemplate(
            TopicPartitionOffset topic, int releaseSize, AtomicInteger releaseCount) {
        //Create Container Properties
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //Set the consumer Config
        //Create Consumer Factory with Consumer Config
        DefaultKafkaConsumerFactory<Integer, Collection<ConsumerRecord<Integer, String>>> cf =
                new DefaultKafkaConsumerFactory<>(consumerConfigs());

        //Create Listener Container with Consumer Factory and Container Property
        KafkaMessageListenerContainer<Integer, Collection<ConsumerRecord<Integer, String>>> container =
                new KafkaMessageListenerContainer<>(cf, containerProperties);
        //  container.setBeanName(this.testName);
        AggregatingReplyingKafkaTemplate<Integer, String, String> template =
                new AggregatingReplyingKafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()), container,
                        (list, timeout) -> {
                            releaseCount.incrementAndGet();
                            return list.size() == releaseSize;
                        });
        template.setSharedReplyTopic(true);
        template.start();
        return template;
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        return props;
    }

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
        return props;
    }

    public ProducerFactory<Integer,String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @KafkaListener(id = "def1", topics = { D_REQUEST}, groupId = "D_REQUEST1")
    @SendTo  // default REPLY_TOPIC header
    public String dListener1(String in) throws InterruptedException {
        return "First Consumer : "+ in.toUpperCase();
    }

    @KafkaListener(id = "def2", topics = { D_REQUEST}, groupId = "D_REQUEST2")
    @SendTo  // default REPLY_TOPIC header
    public String dListener2(String in) throws InterruptedException {
        return "Second Consumer : "+ in.toLowerCase();
    }

    @KafkaListener(id = "def3", topics = { D_REQUEST}, groupId = "D_REQUEST3")
    @SendTo  // default REPLY_TOPIC header
    public String dListener3(String in) throws InterruptedException {
        Thread.sleep(11000);
        return "Third Consumer : "+ in;
    }

}
'''


  [1]: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java
4

1 回答 1

0

template.setReturnPartialOnTimeout(true)只是意味着模板将在超时时咨询发布策略(使用timeout参数 = true,告诉策略它是超时而不是传递调用)。

它必须返回 true 才能释放部分结果。

这是为了让您查看(并可能修改)列表以决定是否要释放或丢弃。

您的策略忽略timeout参数:

   (list, timeout) -> {
        releaseCount.incrementAndGet();
        return list.size() == releaseSize;
    });

你需要return timeout ? true : { ... }.

于 2020-03-31T13:54:28.743 回答