2

我的意图是从各种源/目录创建 IntegrationFlow bean 实例(首先,可能来自 ftp)。因此,在application.properties我想定义这样的东西时,入站目录的数量可能会有所不同:

inbound.file.readPath[0]=source1
inbound.file.processedPath[0]=processed1
inbound.file.failedPath[0]=failed1

inbound.file.readPath[1]=source2
inbound.file.processedPath[1]=processed2
inbound.file.failedPath[1]=failed2

我还想维护源的来源(通过标题丰富),因此不能将所有文件放在 spring 之外的一个目录中。

那么有一个 FilePollingFlow 是否可以从上述属性创建这些 bean 实例?我可以想象这样的事情,但我不确定如何将属性传递给 bean 实例以及如何引用索引:

@Configuration
public class FilePollingIntegrationFlow extends AbstractFactoryBean<IntegrationFlow> {

    @Autowired
    private FilePollingConfiguration config;

    @Override
    public Class<IntegrationFlow> getObjectType() {
        return IntegrationFlow.class;
    }

    @Override
    protected IntegrationFlow createInstance() throws Exception {
      return IntegrationFlows
                .from(s -> /* FIXME config.getReadPath()? instead of inboundReadDirectory, but how to handle indices? */s.file(inboundReadDirectory).preventDuplicates(true).scanEachPoll(true).patternFilter("*.txt"),
                        e -> e.poller(Pollers.fixedDelay(inboundPollingPeriod)
                                .taskExecutor(taskExecutor())
                                .transactionSynchronizationFactory(transactionSynchronizationFactory())
                                .transactional(transactionManager())))
                .log(LoggingHandler.Level.INFO, getClass().getName(), "'Read inbound file: ' .concat(payload)")
                .enrichHeaders(m -> m.headerExpression(FileHeaders.ORIGINAL_FILE, "payload"))
                .transform(Transformers.fileToString())
                .channel(ApplicationConfiguration.FILE_INBOUND_CHANNEL)
                .get();
   }
}
@Component
@ConfigurationProperties("inbound")
public class FilePollingConfiguration {

private List<File> files = new ArrayList<>();

public static class File {
    private String readPath;
    private String processedPath;
    private String failedPath;

    public String getReadPath() {
        return readPath;
    }

    public void setReadPath(String readPath) {
        this.readPath = readPath;
    }

    public String getProcessedPath() {
        return processedPath;
    }

    public void setProcessedPath(String processedPath) {
        this.processedPath = processedPath;
    }

    public String getFailedPath() {
        return failedPath;
    }

    public void setFailedPath(String failedPath) {
        this.failedPath = failedPath;
    }

    @Override
    public String toString() {
        return new ToStringBuilder(this)
                .append("readPath", readPath)
                .append("processedPath", processedPath)
                .append("failedPath", failedPath)
                .toString();
    }


    public List<File> getFiles() {
        return files;
    }

    public void setFiles(List<File> files) {
        this.files = files;
    }
}
4

1 回答 1

2

对于几个类似的流程,框架为您提供了一个解决方案,例如IntegrationFlowContexthttps ://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/java-dsl.html#java-dsl-运行时流程。因此,您需要的只是对这些文件和动态创建流程及其注册的迭代。

关于像你这样的类似属性的列表,File你应该修改 Spring Boot 的建议:https ://docs.spring.io/spring-boot/docs/2.0.0.RELEASE/reference/htmlsingle/#boot-features-external-配置 yaml

注意那里的servers财产。我的意思是,如果您files在 Java 类中调用它,那必须files在属性文件中。

更新

这对我来说是这样的:

我的application.properties

my.source.dirs=/tmp/in1,/tmp/in2

该应用程序是这样的:

@SpringBootApplication
public class So49168720Application {

    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(So49168720Application.class, args);

        File file1 = new File("/tmp/in1", "foo.txt");
        file1.createNewFile();
        FileCopyUtils.copy("FOO".getBytes(), file1);

        File file2 = new File("/tmp/in2", "bar.txt");
        file2.createNewFile();
        FileCopyUtils.copy("BAR".getBytes(), file2);


        PollableChannel resultChannel = applicationContext.getBean("resultChannel", PollableChannel.class);

        System.out.println(resultChannel.receive(10000));
        System.out.println(resultChannel.receive(10000));

        file1.delete();
        file2.delete();
    }

    @Value("${my.source.dirs}")
    private String[] sourceDirs;

    @Autowired
    private IntegrationFlowContext flowContext;


    @PostConstruct
    private void registerFilePollingFlows() {
        Arrays.asList(this.sourceDirs).forEach(inboundSource -> {
            IntegrationFlow flow =
                    IntegrationFlows
                            .from(Files.inboundAdapter(new File(inboundSource))
                                    .patternFilter("*.txt"))
                            .log(LoggingHandler.Level.INFO, getClass().getName(),
                                    "'Read inbound file: ' .concat(payload)")
                            .transform(Files.toStringTransformer())
                            .channel(resultChannel())
                            .get();

            this.flowContext.registration(flow).register();
        });
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerSpec defaultPoller() {
        return Pollers.fixedDelay(1000);
    }

    @Bean
    public PollableChannel resultChannel() {
        return new QueueChannel();
    }

}

我在日志中有这些消息:

2018-03-13 17:43:05.148  INFO 19676 --- [ask-scheduler-3] ication$$EnhancerBySpringCGLIB$$4fda2b12 : Read inbound file: \tmp\in2\bar.txt
2018-03-13 17:43:05.148  INFO 19676 --- [ask-scheduler-2] ication$$EnhancerBySpringCGLIB$$4fda2b12 : Read inbound file: \tmp\in1\foo.txt
GenericMessage [payload=BAR, headers={file_originalFile=\tmp\in2\bar.txt, id=4a692a68-3871-b708-a28e-c4dc378de7e5, file_name=bar.txt, file_relativePath=bar.txt, timestamp=1520977385150}]
GenericMessage [payload=FOO, headers={file_originalFile=\tmp\in1\foo.txt, id=32597359-6602-3df6-5f6f-dac2f4ad788f, file_name=foo.txt, file_relativePath=foo.txt, timestamp=1520977385150}]

但是,这已经基于 Spring Integration5.0和 Spring Boot 2.0。有什么理由不升级您的项目?

于 2018-03-08T12:29:46.930 回答