我有一个异步 IntegrationFlow,它在不同的环境中有一些不同的行为。我将它部署在一个环境中,当我通过网关调用它时它会立即启动(这是所需的行为)。我将它部署在不同的环境中,并且当我在同一个项目中运行的单独的 cron IntegrationFlow 运行时,IntegrationFlow 将不会启动,直到每小时的顶部。我的问题是如何将这个 IntegrationFlow 配置为在被调用时立即启动或手动调用它,以便它始终立即运行。我真的很感激任何帮助。这是我的代码:
网关
@MessagingGateway
public interface SearchGateway {
@Gateway(requestChannel = "searchFlow.input")
void mySearch(FlowSearchDto flowSearchDto);
}
集成流
@SuppressWarnings({ "unchecked", "rawtypes" })
@Component
public class SearchEndpoint {
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}
@Bean(value = "searchFlow")
public IntegrationFlow searchFlow() {
return IntegrationFlows
.from("searchFlow.input")
.split("payload.requestList", (Consumer) null)
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.enrichHeaders(new Consumer<HeaderEnricherSpec>() {
@Override
public void accept(HeaderEnricherSpec t) {
t.header(HttpHeaders.CONTENT_TYPE, "application/json;charset=utf8");
t.header(MessageHeaders.ERROR_CHANNEL, "searchErrorChannel");
t.headerExpression("restRequestDto", PAYLOAD);
t.defaultOverwrite(true);
}
})
.route("payload.serviceName", new Consumer<RouterSpec<String, ExpressionEvaluatingRouter>>() {
@Override
public void accept(RouterSpec<String, ExpressionEvaluatingRouter> spec) {
spec.channelMapping(ServiceEndpointType.ABC.getServiceName(), "searchAbc")
.channelMapping(ServiceEndpointType.DEF.getServiceName(), "searchDef")
.channelMapping(ServiceEndpointType.GHI.getServiceName(), "searchGhi");
}
})
.get();
}
@Bean(value = "searchAbcFlow")
public IntegrationFlow searchAbcFlow() {
return IntegrationFlows
.from(MessageChannels.queue("searchAbc", 10))
.enrichHeaders(new Consumer<HeaderEnricherSpec>() {
@Override
public void accept(HeaderEnricherSpec t) {
t.header("url", searchAbcServiceUrl);
t.defaultOverwrite(true);
}
})
.handle(restClientHandler)
.handle(message -> {})
.get();
}
}
呼叫服务
@Service
public class MyServiceImpl implements MyService {
@Override
public boolean myIntegrationSearch(FlowSearchDto flowSearchDto) {
// I NEED THIS CALL TO KICK OFF THE ASYNCHRONOUS FLOW IMMEDIATELY!!!
searchGateway.mySearch(flowSearchDto);
return true;
}
}