当我将 AggregatingReplyingKafkaTemplate 与 template.setReturnPartialOnTimeout(true) 一起使用时,似乎存在一个问题,即使消费者可以获得部分结果,它也会返回超时异常。
在下面的示例中,我有 3 个消费者来回复请求主题,并且我将回复超时设置为 10 秒。我已经明确地将消费者 3 的响应延迟到 11 秒,但是,我希望消费者 1 和 2 的响应能够返回,所以我可以返回部分结果。但是,我收到了 KafkaReplyTimeoutException。感谢您的投入。谢谢。
我根据下面的单元测试遵循代码。[回复KafkaTemplateTests][1]
我在下面提供了实际代码:
@RestController
public class SumController {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
public static final String D_REPLY = "dReply";
public static final String D_REQUEST = "dRequest";
@ResponseBody
@PostMapping(value="/sum")
public String sum(@RequestParam("message") String message) throws InterruptedException, ExecutionException {
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
new TopicPartitionOffset(D_REPLY, 0), 3, new AtomicInteger());
String resultValue ="";
String currentValue ="";
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(10));
template.setReturnPartialOnTimeout(true);
ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, message);
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(5, TimeUnit.SECONDS); // send ok
System.out.println("Send Completed Successfully");
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord = future.get(10, TimeUnit.SECONDS);
System.out.println("Consumer record size "+consumerRecord.value().size());
Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();
while (iterator.hasNext()) {
currentValue = iterator.next().value();
System.out.println("response " + currentValue);
System.out.println("Record header " + consumerRecord.headers().toString());
resultValue = resultValue + currentValue + "\r\n";
}
} catch (Exception e) {
System.out.println("Error Message is "+e.getMessage());
}
return resultValue;
}
public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemplate(
TopicPartitionOffset topic, int releaseSize, AtomicInteger releaseCount) {
//Create Container Properties
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//Set the consumer Config
//Create Consumer Factory with Consumer Config
DefaultKafkaConsumerFactory<Integer, Collection<ConsumerRecord<Integer, String>>> cf =
new DefaultKafkaConsumerFactory<>(consumerConfigs());
//Create Listener Container with Consumer Factory and Container Property
KafkaMessageListenerContainer<Integer, Collection<ConsumerRecord<Integer, String>>> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
// container.setBeanName(this.testName);
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()), container,
(list, timeout) -> {
releaseCount.incrementAndGet();
return list.size() == releaseSize;
});
template.setSharedReplyTopic(true);
template.start();
return template;
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
return props;
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
return props;
}
public ProducerFactory<Integer,String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@KafkaListener(id = "def1", topics = { D_REQUEST}, groupId = "D_REQUEST1")
@SendTo // default REPLY_TOPIC header
public String dListener1(String in) throws InterruptedException {
return "First Consumer : "+ in.toUpperCase();
}
@KafkaListener(id = "def2", topics = { D_REQUEST}, groupId = "D_REQUEST2")
@SendTo // default REPLY_TOPIC header
public String dListener2(String in) throws InterruptedException {
return "Second Consumer : "+ in.toLowerCase();
}
@KafkaListener(id = "def3", topics = { D_REQUEST}, groupId = "D_REQUEST3")
@SendTo // default REPLY_TOPIC header
public String dListener3(String in) throws InterruptedException {
Thread.sleep(11000);
return "Third Consumer : "+ in;
}
}
'''
[1]: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java