这是我的流程 LS-GET(SFTP 出站网关:从远程 SFTP 服务器下载文件。)和 MessagingGateway。
@MessagingGateway
public interface IntegratedRemoteFileProcessMessagingGateway {
@Gateway(requestChannel = "getFlows.input")
void getFlows(final String remoteDirectory);
@Gateway(requestChannel = "moveFlows.input")
void moveFlows(final String remoteDirectory);
}
@Bean
public QueueChannelSpec getOutputChannel() {
return MessageChannels.queue();
}
@Bean
public IntegrationFlow getFlows() {
return f -> f
.enrichHeaders(h -> h
.headerExpression("originalPayload", "payload.toString()")
.headerExpression(FileHeaders.REMOTE_DIRECTORY, "payload.toString()"))
.log(LoggingHandler.Level.INFO, "eu.haee", "'Header originalPayload=' + headers[originalPayload]")
.handle(Sftp.outboundGateway(sessionFactory, Command.LS.getCommand(), "payload")
.autoCreateDirectory(false)
.autoCreateLocalDirectory(false)
.charset("UTF-8")
.filter(new SftpSimplePatternFileListFilter("*.xml"))
.options(Option.NAME_ONLY, Option.RECURSIVE))
.split()
.log(LoggingHandler.Level.INFO, "eu.haee", "'LS Payload= ' + payload.toString()")
.enrichHeaders(h -> h
.headerExpression("originalRemoteFile", "payload.toString()")
.headerExpression(FileHeaders.REMOTE_FILE, "payload.toString()"))
.handle(Sftp.outboundGateway(sessionFactory, Command.GET.getCommand(), "headers['originalPayload'] + headers['file_remoteFile']")
.autoCreateLocalDirectory(false)
.charset("UTF-8")
.fileNameExpression("headers['file_remoteFile']")
.localDirectory(new File(flowsConfiguration.localDirectory()))
.localFilenameExpression(new FunctionExpression<Message<?>>(m -> {
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(m);
final String remoteFileName = (String) accessor.getHeader("file_remoteFile");
final int extensionIndex = remoteFileName.lastIndexOf('.');
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSSSS", Locale.GERMAN);
return String.format("%s_MYC(%s-%d)%s", remoteFileName.substring(0, extensionIndex),
ZonedDateTime.of(LocalDateTime.now(), ZoneId.of("Europe/Berlin")).format(formatter),
(new SecureRandom()).nextInt(99999),
remoteFileName.substring(extensionIndex));
}))
.options(Option.PRESERVE_TIMESTAMP)
.remoteFileSeparator("/"))
.channel("getOutputChannel");
}
这是我的 spring-batch tasklet 和 Junit。通过 tasklet 构造函数注入的 MessagingGateway。
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (Endpoint endpoint : endpoints) {
final String remoteDirectory = endpoint.getEpConUri();
logger.info("ProcessRemoteFilesFlowsTasklet {} dealer at {} remote files process starting",
endpoint.getId().getDlrCd(), remoteDirectory);
flowsMessagingGateway.getFlows(remoteDirectory);
}
return RepeatStatus.FINISHED;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
controlChannel.send(new GenericMessage<>("@getPoller.start()"));
logger.info("GetPollerRemoteFilesFlowsTasklet poller starting...");
return RepeatStatus.FINISHED;
}
@Autowired
private IntegratedRemoteFileProcessMessagingGateway flowsMessagingGateway;
@Autowired
private EndpointRepository endpointRepository;
@Test
public void getFlows() {
flowsMessagingGateway.getFlows("/c07va00011/iris/import/");
Uninterruptibles.sleepUninterruptibly(60, TimeUnit.SECONDS);
}
当我执行 getFlows 测试代码时。我遇到了异常。但文件下载到我的本地计算机。我不知道。我尝试了许多变体,但没有取得任何进展。
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.getFlows.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=/c07va00011/iris/import/, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e64ae1a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e64ae1a, id=bd393cb7-42d0-03b2-674d-40e3cf9211de, timestamp=1609844917799}]
...
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 145 common frames omitted
@EnableIntegration 放置每个 spring-integration 相关的配置类。@IntegrationComponentScan 还放置了我的主要流程配置类(带有要扫描的包名称的字符串数组)。如果@EnableIntegration 注解位于多个类中会发生什么?
我应该将所有 spring-batch 和 spring-integration 配置类合并为一个吗?
另外,我测试了 ControlBus(在 spring-batch tasklet 中向轮询器发送消息)并得到了同样的异常。
11:57:36.481 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step startGetPollerRemoteFilesStep in job integratedFilesProcessJob2
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.controlChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=@getPoller.start(), headers={id=539a27d0-9bce-062d-8664-53aae14b5680, timestamp=1609930656454}]
@Lazy,@DependsOn 也不起作用。(@Lazy 添加到 ControlBus,@DependsOn 添加到 spring 服务类中:Spring-batch 作业也通过 rest API 调用手动启动/停止。)
@Autowired
public BatchFileServiceConfiguration(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
PropertyConfiguration propertyConfiguration,
@Qualifier("sourceBatchTransactionManager") PlatformTransactionManager sourceBatchTransactionManager,
@Qualifier("sourceBatchEntityManagerFactory") EntityManagerFactory sourceBatchEntityManagerFactory,
@Qualifier("processFileTaskExecutor") TaskExecutor processFileTaskExecutor,
BatchEndpointRepository batchEndpointRepository,
RemoteFileProcessMessagingGateway remoteFileProcessMessagingGateway,
@Lazy @Qualifier("controlChannel") MessageChannel controlChannel) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.propertyConfiguration = propertyConfiguration;
this.sourceBatchTransactionManager = sourceBatchTransactionManager;
this.sourceBatchEntityManagerFactory = sourceBatchEntityManagerFactory;
this.processFileTaskExecutor = processFileTaskExecutor;
this.batchEndpointRepository = batchEndpointRepository;
this.remoteFileProcessMessagingGateway = remoteFileProcessMessagingGateway;
this.controlChannel = controlChannel;
@Service
@DependsOn({"lsFlows", "getFlows", "moveFlows", "moveFailedFlows", "getPollableFlows"})
public class FileServiceImpl implements FileService {
在 spring-integration 独立应用程序中从未发生过这些异常。