1

我正在与 REST 服务集成,想法是它marketingCategoryOutboundGatewayHttpRequestExecutingMessageHandler. 网关向 REST 服务发出请求并将其响应推送到marketingCategory通道。网关本身由marketingCategoryPollerMessageSource使用makeTriggeringMessage工厂方法创建的消息触发。

问题是服务返回分页结果。除了我已经拥有的服务激活器之外,我会在频道上收听一些东西marketingCategory,检查是否响应并将具有由创建的递增页码的新消息推makeTriggeringMessage送到marketingCategoryPoller频道,以便代码将在循环中旋转,直到它获取来自 REST 服务的所有页面。

Spring Integration 是否允许这样的过滤器在输入通道上接收一条消息,根据条件对其进行测试并在条件为真时将新消息推送到输出通道?

编码:

//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}

//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}

//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}

//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
    //make a message for triggering marketingCategoryOutboundGateway
    return MessageBuilder.withPayload("")
            .setHeader("Host", "eclinic")
            .setHeader("page", page)
            .build();
}

//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
    //make a request to the REST service and push the response to the marketingCategory channel
}

//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
    return (msg) -> {
        //process the categories returned by marketingCategoryOutboundGateway
    };
}
4

1 回答 1

3

我找到了一个基于此发布的解决方案Read and download from a paginated REST-Services with spring integration

  1. 触发与 REST 服务对话的出站网关,并使用带有轮询器的入站通道适配器将响应推送到通道。入站通道适配器是一个消息源,它最初生成带有标头的消息,该标头指示要从 REST API 获取的页码。出站网关使用页面消息头生成指定所需页面的 url

  2. 出站网关向其推送 REST 服务响应的通道有 2 个订阅者:

    2.1。一个服务激活器,它对获取的数据做一些事情

    2.2. 一个过滤器,它检查这是否是最后一页,如果不是,它将消息进一步发送到标题丰富器使用的另一个通道

  3. 收到消息后,标头丰富器增加其页标头并将消息进一步推送到触发出站网关的通道,网关读取增加的页标头并从 REST 服务获取下一页

  4. 循环一直在旋转,直到 REST 服务返回最后一页。过滤器不允许此消息传递到标头丰富器,从而打破循环。

完整代码:

@Configuration
public class IntegrationConfiguration {

    private final ApiGateConfig apiGateConfig;

    IntegrationConfiguration(ApiGateConfig apiGateConfig) {
        this.apiGateConfig = apiGateConfig;
    }

    @Bean("marketingCategory")
    MessageChannel marketingCategory() {
        return new PublishSubscribeChannel();
    }

    @Bean
    MessageChannel marketingCategoryPoller() {
        return new DirectChannel();
    }

    @Bean
    MessageChannel marketingCategoryPollerNextPage() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
    public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
        return () -> makeTriggeringMessage(0);
    }

    /**
     * Build a gateway triggering message
     */
    private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
        return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
                .setHeader("Host", "eclinic")
                .setHeader("page", page)
                .build();
    }

    @Bean
    @ServiceActivator(inputChannel = "marketingCategoryPoller")
    public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {

        String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";

        //the type of the payload
        ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
        };

        //page number comes from the message
        SpelExpressionParser expressionParser = new SpelExpressionParser();
        var uriVariables = new HashMap<String, Expression>();
        uriVariables.put("page", expressionParser.parseExpression("headers.page"));

        HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
        handler.setHttpMethod(HttpMethod.GET);
        handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
        handler.setOutputChannel(channel);
        handler.setUriVariableExpressions(uriVariables);

        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "marketingCategory")
    public MessageHandler marketingCategoryHandler() {
        return (msg) -> {
            var page = (RestPageImpl<MarketingCategory>) msg.getPayload();

            System.out.println("Page #" + page.getNumber());

            page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));

        };
    }

    @Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
    public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
        return !page.isLast();
    }

    @Bean
    @Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
    HeaderEnricher incrementPage() {
        Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
        Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");

        var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
        valueProcessor.setOverwrite(true);

        headersToAdd.put("page", valueProcessor);
        return new HeaderEnricher(headersToAdd);
    }
}
于 2019-02-08T13:51:58.630 回答