我WatchServiceDirectoryScanner/RecursiveLeafOnlyDirectoryScanner
用来处理文件系统中的文件。生成文件事件并在定义的端点接收消息,但有时它会错过处理该目录中的所有文件。
例如,如果有 15 个文件,有时它会处理 10 个文件,有时会处理 5 个文件,其中 Metastore 包含所有 15 个文件的信息"metadata-store.properties"
。
弹簧集成配置.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<int:annotation-config />
<int:channel id="cfpFileIn"></int:channel>
<int-file:inbound-channel-adapter id="cfpFileIn"
directory="${cfp.flight.data.dir}" auto-startup="true" scanner="csvDirScanner">
<int:poller fixed-delay="${cfp.flight.data.dir.polling.delay}"></int:poller>
</int-file:inbound-channel-adapter>
<bean id="csvDirScanner"
class="org.springframework.integration.file.WatchServiceDirectoryScanner">
<constructor-arg index="0" value="${cfp.flight.data.dir}" />
<property name="filter" ref="csvCompositeFilter" />
<property name="autoStartup" value="true" />
</bean>
<bean id="csvCompositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean
class="org.springframework.integration.file.filters.SimplePatternFileListFilter">
<constructor-arg value="*.csv" />
</bean>
<ref bean="persistentFilter" />
</list>
</constructor-arg>
</bean>
<bean id="persistentFilter"
class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="metadataStore" />
<constructor-arg index="1" name="prefix" value="" />
<property name="flushOnUpdate" value="true" />
</bean>
<bean name="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore">
<property name="baseDirectory" value="${metadata.dir}"></property>
</bean>
</beans>
批处理作业调度器:
import java.io.File;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Component
public class BatchJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(BatchJobScheduler.class);
@Autowired
protected JobLauncher jobLauncher;
@Autowired
@Qualifier(value = "job")
private Job job;
@ServiceActivator(inputChannel = "cfpFileIn")
public void run(File file) {
String fileName = file.getAbsolutePath();
logger.info("BatchJobScheduler Running #################"+fileName);
JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", fileName).toJobParameters();
try {
JobExecution execution = jobLauncher.run(job,
jobParameters);
logger.info(" BatchJobScheduler Exit Status : " + execution.getStatus() +"::"+execution.getAllFailureExceptions());
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
logger.error(" BatchJobScheduler Exit Status : Exception ::",e);
}
}
}
似乎我缺少一些配置或代码。
基于建议:
我已将完整spring-integration.xml
的日志文件放在 git repo - https://github.com/chandaku/spring-integration-issue上。
这里创建了 5 个通道并同时工作我认为当我一次只运行一个通道时,它会成功处理目录中的所有文件,但是如果我打开所有文件通道就会出现问题。