只是想找出一个简单的例子,使用 Spring Boot 2.1.12 和 Spring for Apache Kafka 2.2.12 与 KafkaListener 一起工作,以重试最后一条失败的消息。如果消息失败,则应将消息重定向到将进行重试的另一个主题。我们将有 4 个主题。topic, retryTopic, sucessTopic 和 errorTopic 如果 topic 失败,应该重定向到 retryTopic,在那里进行 3 次重试尝试。如果这些尝试失败,则必须重定向到 errorTopic。如果主题和重试主题都成功,则应重定向到成功主题。我需要用JUnit Test 覆盖 90% 的案例。
问问题
505 次
2 回答
0
public Jaxb2Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
// this package must match the package in the <generatePackage>
// specified in
// pom.xml
marshaller.setContextPath("br.com.company.ws");
return marshaller;
}
@Bean
public CountryClient countryClient(Jaxb2Marshaller marshaller) {
CountryClient client = new CountryClient();
client.setDefaultUri(link);
WebServiceTemplate template = client.getWebServiceTemplate();
template.setMessageSender(new WebServiceMessageSenderWithAuth(username, password));
client.setMarshaller(marshaller);
client.setUnmarshaller(marshaller);
return client;
}
@Service 公共类 CountryClient 扩展 WebServiceGatewaySupport {
@Value("${spring.link.consumer.link}")
private String link;
public ZfifNfMaoResponse getCountry(ZfifNfMao zfifNfMao) {
zfifNfMao = new ZfifNfMao();
ZfifNfMaoResponse response = (ZfifNfMaoResponse)getWebServiceTemplate().marshalSendAndReceive(link, zfifNfMao);
return response;
}
}
于 2020-02-13T22:01:27.103 回答
0
2.2.8.发布
那是6个月大。
您应该始终在次要版本中使用最新版本,以确保您已修复所有错误;当前的 2.2.x 版本是 2.2.12。
我已经从这里修改了我的 2.2.x 答案,并举例说明了如何测试这样的应用程序。它使用嵌入式测试 Kafka 代理。
@SpringBootApplication
public class So601723041Application {
public static void main(String[] args) {
SpringApplication.run(So601723041Application.class, args);
}
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
@Override
protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
KafkaListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
customize(instance, template);
}
};
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> retryKafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
@Override
protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
KafkaListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
customize(instance, template);
}
};
configurer.configure(factory, kafkaConsumerFactory);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(5000L);
retryTemplate.setBackOffPolicy(backOffPolicy);
factory.setRetryTemplate(retryTemplate);
return factory;
}
private void customize(ConcurrentMessageListenerContainer<Object, Object> container,
KafkaTemplate<Object, Object> template) {
if (container.getContainerProperties().getTopics()[0].equals("topic")) {
container.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template,
(cr, ex) -> new TopicPartition("retryTopic", cr.partition())),
0));
}
else if (container.getContainerProperties().getTopics()[0].equals("retryTopic")) {
container.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template,
(cr, ex) -> new TopicPartition("errorTopic", cr.partition())),
0)); // no retries here - retry template instead.
}
}
}
@Component
public class Listener {
private final KafkaTemplate<String, String> template;
private SomeService service;
public Listener(KafkaTemplate<String, String> template, SomeService service) {
this.template = template;
this.service = service;
}
public void setService(SomeService service) {
this.service = service;
}
@KafkaListener(id = "so60172304.1", topics = "topic")
public void listen1(String in) {
this.service.process(in);
this.template.send("successTopic", in);
}
@KafkaListener(id = "so60172304.2", topics = "retryTopic", containerFactory = "retryKafkaListenerContainerFactory")
public void listen2(String in) {
this.service.retry(in);
this.template.send("successTopic", in);
}
}
public interface SomeService {
void process(String in);
void retry(String in);
}
@Component
public class DefaultService implements SomeService {
@Override
public void process(String in) {
System.out.println("topic: " + in);
}
@Override
public void retry(String in) {
System.out.println("retryTopic: " + in);
}
}
spring.kafka.consumer.auto-offset-reset=earliest
测试用例:
@RunWith(SpringRunner.class)
@SpringBootTest
@TestPropertySource(properties = "spring.kafka.bootstrap-servers = ${spring.embedded.kafka.brokers}")
public class So601723041ApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embedded = new EmbeddedKafkaRule(1, true, 1,
"topic", "retryTopic", "successTopic", "errorTopic");
@Autowired
private Listener listener;
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
TestService testService = spy(new TestService());
this.listener.setService(testService);
template.send("topic", "failAlways");
template.send("topic", "onlyFailFirst");
template.send("topic", "good");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "false", embedded.getEmbeddedKafka());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
embedded.getEmbeddedKafka().consumeFromEmbeddedTopics(consumer, "successTopic");
List<ConsumerRecord<Integer, String>> received = new ArrayList<>();
int n = 0;
while (received.size() < 2 && n++ < 10) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));
records.forEach(rec -> received.add(rec));
}
assertThat(received.size() == 2);
consumer.close();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
consumer = new KafkaConsumer<>(props);
embedded.getEmbeddedKafka().consumeFromEmbeddedTopics(consumer, "errorTopic");
n = 0;
while (received.size() < 3 && n++ < 10) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));
records.forEach(rec -> received.add(rec));
}
assertThat(received.size() == 3);
consumer.close();
verify(testService, times(3)).process(anyString());
verify(testService, times(4)).retry(anyString());
assertThat(received).extracting(rec -> rec.value())
.contains("good", "onlyFailFirst", "failAlways");
}
public static class TestService implements SomeService {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void process(String in) {
System.out.println("topic: " + in);
if (in.toLowerCase().contains("fail")) {
throw new RuntimeException(in);
}
}
@Override
public void retry(String in) {
System.out.println("retryTopic: " + in);
if (in.startsWith("fail")) {
throw new RuntimeException(in);
}
}
}
}
EMMA 显示 97.2% 的覆盖率
聚甲醛:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>so60172304-1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>so60172304-1</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
于 2020-02-13T15:30:11.667 回答