我已经建立了一个简单的 Spring Integration 流程,它由以下步骤组成:
- 然后定期轮询休息 api
- 对有效载荷进行一些处理
- 并将其放在 Kafka 主题上。
请注意以下代码:
@Component
public class MyIntegrationFlow extends IntegrationFlowAdapter {
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(() -> List.of("pathVariable1", "pathVariable2"), c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.split()
.handle(httpRequest(), c -> c.advice(new RequestHandlerRetryAdvice()))
.transform(Tranformers.fromJson(Foo.class))
.filter(payload -> payload.isValid())
.log()
.transform(Tranformers.toJson())
.channel(Source.OUTPUT); // output channel for kafka topic
}
private HttpMessageHandlerSpec httpRequest() {
return Http.outboundGateway("http://somehost:8080/{pathVariable}")
.httpMethod(GET)
.uriVariable("pathVariable", Message::getPayload)
.expectedResponseType(String.class);
}
}
这非常有效,但是,我正在努力提出一些好的测试。
- 我应该如何模拟外部 REST API?
- 我应该如何测试重试策略是否启动以及是否发出了所需数量的 http 请求?
- 如何更改
MessageSource
定期轮询的流(路径变量列表)? - 如何检查有效负载是否已成功进入 Kafka 主题?