1

我有一个异步 IntegrationFlow,它在不同的环境中有一些不同的行为。我将它部署在一个环境中,当我通过网关调用它时它会立即启动(这是所需的行为)。我将它部署在不同的环境中,并且当我在同一个项目中运行的单独的 cron IntegrationFlow 运行时,IntegrationFlow 将不会启动,直到每小时的顶部。我的问题是如何将这个 IntegrationFlow 配置为在被调用时立即启动或手动调用它,以便它始终立即运行。我真的很感激任何帮助。这是我的代码:

网关

@MessagingGateway
public interface SearchGateway {

    @Gateway(requestChannel = "searchFlow.input")
    void mySearch(FlowSearchDto flowSearchDto);

}

集成流

@SuppressWarnings({ "unchecked", "rawtypes" })
@Component
public class SearchEndpoint {

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    return Pollers.fixedDelay(1000).get();
}

@Bean(value = "searchFlow")
public IntegrationFlow searchFlow() {
  return IntegrationFlows
    .from("searchFlow.input")
    .split("payload.requestList", (Consumer) null)
    .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
    .enrichHeaders(new Consumer<HeaderEnricherSpec>() {

      @Override
      public void accept(HeaderEnricherSpec t) {
        t.header(HttpHeaders.CONTENT_TYPE, "application/json;charset=utf8");
        t.header(MessageHeaders.ERROR_CHANNEL, "searchErrorChannel");
        t.headerExpression("restRequestDto", PAYLOAD);
        t.defaultOverwrite(true);
      }

    })
    .route("payload.serviceName", new Consumer<RouterSpec<String, ExpressionEvaluatingRouter>>() {

      @Override
      public void accept(RouterSpec<String, ExpressionEvaluatingRouter> spec) {
        spec.channelMapping(ServiceEndpointType.ABC.getServiceName(), "searchAbc")
            .channelMapping(ServiceEndpointType.DEF.getServiceName(), "searchDef")
            .channelMapping(ServiceEndpointType.GHI.getServiceName(), "searchGhi");
      }

    })
    .get();
}

@Bean(value = "searchAbcFlow")
public IntegrationFlow searchAbcFlow() {
return IntegrationFlows
    .from(MessageChannels.queue("searchAbc", 10))
    .enrichHeaders(new Consumer<HeaderEnricherSpec>() {

      @Override
      public void accept(HeaderEnricherSpec t) {
        t.header("url", searchAbcServiceUrl);
        t.defaultOverwrite(true);
      }

    })
    .handle(restClientHandler)
    .handle(message -> {})
    .get();
  }
}

呼叫服务

@Service
public class MyServiceImpl implements MyService {
  @Override
  public boolean myIntegrationSearch(FlowSearchDto flowSearchDto) {

    // I NEED THIS CALL TO KICK OFF THE ASYNCHRONOUS FLOW IMMEDIATELY!!!
    searchGateway.mySearch(flowSearchDto);

    return true;
  }
}
4

0 回答 0