0

我的场景是一个 Http 出站网关,我在其中向外部服务请求由TransferRequest实体表示的下一个转换。网关是“httpOutRequest”通道的端点。“httpOutRequest”通道的起点是一个 bean IntegrationFlow source(),我在其中发送由轮询器触发的空字符串消息。(顺便说一句:这有必要吗?我可以将轮询器直接添加到出站网关吗?如何?)

然后我安装了 errorHandler 通道端点来捕获任何问题。如果问题(异常)的数量是MAX_COUNT_TO_REDUCE_POLLING- 假设因为外部服务不可访问 - 那么我想在运行时将轮询从最初的 5_000 减少到 60_000。

到目前为止,这是我的代码:

    public static final int MAX_COUNT_TO_REDUCE_POLLING = 3;

    private long period = 5000;
    private int problemCounter = 0;

    @Bean
    public IntegrationFlow outbound() {
        return IntegrationFlows.from("httpOutRequest")
                .handle(Http.outboundGateway("http://localhost:8080/harry-potter-service/next/request")
                        .httpMethod(HttpMethod.GET)
                        .expectedResponseType(TransferRequest.class)
                        )
                .channel("reply")
                .get();
    }

    @Bean
    public IntegrationFlow source() {
        return IntegrationFlows.from(
                () -> new GenericMessage<String>(""),
                        e -> e.poller(p -> p.fixedRate(period)))
                .channel("httpOutRequest")
                .get();
    }

    @Bean
    @ServiceActivator(inputChannel = "reply")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("myHandler: " + message.getPayload());
                System.out.println("myHandler: " + message.getHeaders());
                TransferRequest req = (TransferRequest) message.getPayload();
                System.out.println("myHandler: " + req);
            }
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "errorChannel")
    public MessageHandler errorHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                LOG.error("message.payload: " + message.getPayload());
                MessageHandlingException e = (MessageHandlingException) message.getPayload();
                LOG.error("Exception: " + e);
                LOG.debug("exception counter = " + (++problemCounter));

                if (problemCounter >= MAX_COUNT_TO_REDUCE_POLLING) {
                    LOG.debug("would like to reduce poller frequence or stop");
                    period = 60_000;
                //  outbound().stop()
                }
            }
        };
    }

当遇到异常数量的阈值时,如何减少运行时的轮询频率?

我怎么能停止集成流程?

编辑 1

更具体:如果我有消息传递网关

@Bean
public IntegrationFlow source() {
    return IntegrationFlows.from(
            () -> new GenericMessage<String>(""),
                    e -> e.poller(p -> p.fixedRate(period)))
            .channel("httpOutRequest")
            .get();
}

如何在第二个 Lambda 中访问 p?

如何设置p.fixedRate使用Control Channel

4

1 回答 1

1

我可能已经自己解决了这个问题,阅读手册。

请参阅此处以在运行时更改轮询率。为此,您必须使用DynamicPeriodiyTimer包中的org.springframework.integration.util

要更换延迟轮询器,请执行以下操作:

    private final DynamicPeriodicTrigger dynamicPeriodicTrigger =
            new DynamicPeriodicTrigger(5_000);

    @Bean
    public IntegrationFlow normalStateEntryPoint() {
        return IntegrationFlows.from(
                () -> new GenericMessage<String>(""),
                        e -> e.poller(p -> p.trigger(dynamicPeriodicTrigger))
                        .id("normalStateSourcePollingChannelAdapter")
                        .autoStartup(true))
                .channel("httpOutRequest")
                .get();
    }

要将轮询时间从 5.000 减少到 60.000 毫秒,请执行以下操作:

    dynamicPeriodicTrigger.setPeriod(60_000);

就是这样。

于 2021-09-06T00:52:38.653 回答