0

这是我的流程 LS-GET(SFTP 出站网关:从远程 SFTP 服务器下载文件。)和 MessagingGateway。

    @MessagingGateway
    public interface IntegratedRemoteFileProcessMessagingGateway {
    
      @Gateway(requestChannel = "getFlows.input")
      void getFlows(final String remoteDirectory);
    
      @Gateway(requestChannel = "moveFlows.input")
      void moveFlows(final String remoteDirectory);
   }

    @Bean
    public QueueChannelSpec getOutputChannel() {
        return MessageChannels.queue();
    }
    
    @Bean
    public IntegrationFlow getFlows() {
        return f -> f
                .enrichHeaders(h -> h
                        .headerExpression("originalPayload", "payload.toString()")
                        .headerExpression(FileHeaders.REMOTE_DIRECTORY, "payload.toString()"))
                .log(LoggingHandler.Level.INFO, "eu.haee", "'Header originalPayload=' + headers[originalPayload]")
                .handle(Sftp.outboundGateway(sessionFactory, Command.LS.getCommand(), "payload")
                        .autoCreateDirectory(false)
                        .autoCreateLocalDirectory(false)
                        .charset("UTF-8")
                        .filter(new SftpSimplePatternFileListFilter("*.xml"))
                        .options(Option.NAME_ONLY, Option.RECURSIVE))
                .split()
                .log(LoggingHandler.Level.INFO, "eu.haee", "'LS Payload= ' + payload.toString()")
                .enrichHeaders(h -> h
                        .headerExpression("originalRemoteFile", "payload.toString()")
                        .headerExpression(FileHeaders.REMOTE_FILE, "payload.toString()"))
                .handle(Sftp.outboundGateway(sessionFactory, Command.GET.getCommand(), "headers['originalPayload'] + headers['file_remoteFile']")
                        .autoCreateLocalDirectory(false)
                        .charset("UTF-8")
                        .fileNameExpression("headers['file_remoteFile']")
                        .localDirectory(new File(flowsConfiguration.localDirectory()))
                        .localFilenameExpression(new FunctionExpression<Message<?>>(m -> {
                            IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(m);
                            final String remoteFileName = (String) accessor.getHeader("file_remoteFile");
                            final int extensionIndex = remoteFileName.lastIndexOf('.');
                            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSSSS", Locale.GERMAN);
                            return String.format("%s_MYC(%s-%d)%s", remoteFileName.substring(0, extensionIndex), 
                                    ZonedDateTime.of(LocalDateTime.now(), ZoneId.of("Europe/Berlin")).format(formatter), 
                                    (new SecureRandom()).nextInt(99999), 
                                    remoteFileName.substring(extensionIndex));
                        }))
                        .options(Option.PRESERVE_TIMESTAMP)
                        .remoteFileSeparator("/"))
                .channel("getOutputChannel");
    }

这是我的 spring-batch tasklet 和 Junit。通过 tasklet 构造函数注入的 MessagingGateway。

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        for (Endpoint endpoint : endpoints) {
            final String remoteDirectory = endpoint.getEpConUri();
            logger.info("ProcessRemoteFilesFlowsTasklet {} dealer at {} remote files process starting", 
                    endpoint.getId().getDlrCd(), remoteDirectory);
            flowsMessagingGateway.getFlows(remoteDirectory);
        }
        return RepeatStatus.FINISHED;
    }
@Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        controlChannel.send(new GenericMessage<>("@getPoller.start()"));
        logger.info("GetPollerRemoteFilesFlowsTasklet poller starting...");
        return RepeatStatus.FINISHED;
    }
@Autowired
    private IntegratedRemoteFileProcessMessagingGateway flowsMessagingGateway;
        
    @Autowired
    private EndpointRepository endpointRepository;
        
    @Test
    public void getFlows() {
        flowsMessagingGateway.getFlows("/c07va00011/iris/import/");
        Uninterruptibles.sleepUninterruptibly(60, TimeUnit.SECONDS);
    }

当我执行 getFlows 测试代码时。我遇到了异常。但文件下载到我的本地计算机。我不知道。我尝试了许多变体,但没有取得任何进展。

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.getFlows.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=/c07va00011/iris/import/, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e64ae1a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e64ae1a, id=bd393cb7-42d0-03b2-674d-40e3cf9211de, timestamp=1609844917799}]
...
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 145 common frames omitted

@EnableIntegration 放置每个 spring-integration 相关的配置类。@IntegrationComponentScan 还放置了我的主要流程配置类(带有要扫描的包名称的字符串数组)。如果@EnableIntegration 注解位于多个类中会发生什么?

我应该将所有 spring-batch 和 spring-integration 配置类合并为一个吗?

另外,我测试了 ControlBus(在 spring-batch tasklet 中向轮询器发送消息)并得到了同样的异常。

11:57:36.481 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step startGetPollerRemoteFilesStep in job integratedFilesProcessJob2
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.controlChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=@getPoller.start(), headers={id=539a27d0-9bce-062d-8664-53aae14b5680, timestamp=1609930656454}]

@Lazy,@DependsOn 也不起作用。(@Lazy 添加到 ControlBus,@DependsOn 添加到 spring 服务类中:Spring-batch 作业也通过 rest API 调用手动启动/停止。)

@Autowired
    public BatchFileServiceConfiguration(JobBuilderFactory jobBuilderFactory,
            StepBuilderFactory stepBuilderFactory,
            PropertyConfiguration propertyConfiguration,
            @Qualifier("sourceBatchTransactionManager") PlatformTransactionManager sourceBatchTransactionManager,
            @Qualifier("sourceBatchEntityManagerFactory") EntityManagerFactory sourceBatchEntityManagerFactory,
            @Qualifier("processFileTaskExecutor") TaskExecutor processFileTaskExecutor,
            BatchEndpointRepository batchEndpointRepository,
            RemoteFileProcessMessagingGateway remoteFileProcessMessagingGateway,
            @Lazy @Qualifier("controlChannel") MessageChannel controlChannel) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.propertyConfiguration = propertyConfiguration;
        this.sourceBatchTransactionManager = sourceBatchTransactionManager;
        this.sourceBatchEntityManagerFactory = sourceBatchEntityManagerFactory;
        this.processFileTaskExecutor = processFileTaskExecutor;
        this.batchEndpointRepository = batchEndpointRepository;
        this.remoteFileProcessMessagingGateway = remoteFileProcessMessagingGateway;
        this.controlChannel = controlChannel;

@Service
@DependsOn({"lsFlows", "getFlows", "moveFlows", "moveFailedFlows", "getPollableFlows"})
public class FileServiceImpl implements FileService {

在 spring-integration 独立应用程序中从未发生过这些异常。

4

0 回答 0