我的场景是一个 Http 出站网关,我在其中向外部服务请求由TransferRequest
实体表示的下一个转换。网关是“httpOutRequest”通道的端点。“httpOutRequest”通道的起点是一个 bean IntegrationFlow source()
,我在其中发送由轮询器触发的空字符串消息。(顺便说一句:这有必要吗?我可以将轮询器直接添加到出站网关吗?如何?)
然后我安装了 errorHandler 通道端点来捕获任何问题。如果问题(异常)的数量是MAX_COUNT_TO_REDUCE_POLLING
- 假设因为外部服务不可访问 - 那么我想在运行时将轮询从最初的 5_000 减少到 60_000。
到目前为止,这是我的代码:
public static final int MAX_COUNT_TO_REDUCE_POLLING = 3;
private long period = 5000;
private int problemCounter = 0;
@Bean
public IntegrationFlow outbound() {
return IntegrationFlows.from("httpOutRequest")
.handle(Http.outboundGateway("http://localhost:8080/harry-potter-service/next/request")
.httpMethod(HttpMethod.GET)
.expectedResponseType(TransferRequest.class)
)
.channel("reply")
.get();
}
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
@Bean
@ServiceActivator(inputChannel = "reply")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("myHandler: " + message.getPayload());
System.out.println("myHandler: " + message.getHeaders());
TransferRequest req = (TransferRequest) message.getPayload();
System.out.println("myHandler: " + req);
}
};
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler errorHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.error("message.payload: " + message.getPayload());
MessageHandlingException e = (MessageHandlingException) message.getPayload();
LOG.error("Exception: " + e);
LOG.debug("exception counter = " + (++problemCounter));
if (problemCounter >= MAX_COUNT_TO_REDUCE_POLLING) {
LOG.debug("would like to reduce poller frequence or stop");
period = 60_000;
// outbound().stop()
}
}
};
}
当遇到异常数量的阈值时,如何减少运行时的轮询频率?
我怎么能停止集成流程?
编辑 1
更具体:如果我有消息传递网关
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
如何在第二个 Lambda 中访问 p?
如何设置p.fixedRate
使用Control Channel
?