0

我创建了一个 spring-batch 作业,用于从本地目录读取文件并使用 Camel-spring-batch 通过 ftp 将其上传到远程目录。我正在使用块做同样的事情。我的春季批处理作业配置如下:

<bean id="consumerTemplate" class="org.apache.camel.impl.DefaultConsumerTemplate" init-method="start" destroy-method="stop">
    <constructor-arg ref="camelContext"/>
</bean>

<bean id="producerTemplate" class="org.apache.camel.impl.DefaultProducerTemplate" scope="step" init-method="start" destroy-method="stop">
    <constructor-arg ref="camelContext"/>
</bean>

<bean id="localFileReader" class="com.camel.springbatch.reader.LocalFileReader" scope="step" destroy-method="stop">
    <constructor-arg value="file:#{jobParameters['dirPath']}"/>
    <constructor-arg ref="consumerTemplate"/>
</bean>

<bean id="ftpFileWriter" class="com.camel.springbatch.writer.FtpFileWriter" scope="step">
    <constructor-arg ref="producerTemplate"/>
    <constructor-arg value="ftp://#{jobParameters['host']}?username=#{jobParameters['user']}&amp;password=#{jobParameters['password']}"/>
</bean>

作业配置:

<batch:job id="ftpReadWrite">
    <batch:step id="readFromLocalWriteToFtp" next="readFromFtpWriteToLocal">
        <batch:tasklet>
            <batch:chunk reader="localFileReader" writer="ftpFileWriter"  commit-interval="5" />
        </batch:tasklet>
    </batch:step>

我的“Localfilereader”和“ftpFileWriter”看起来像:

import org.apache.camel.ConsumerTemplate;
import org.apache.camel.component.spring.batch.support.CamelItemReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalFileReader extends CamelItemReader {
private Logger log= LoggerFactory.getLogger(this.getClass());
ConsumerTemplate consumerTemplate;
String endpointUri;

public LocalFileReader(ConsumerTemplate consumerTemplate, String endpointUri) {
    super(consumerTemplate, endpointUri);
    this.consumerTemplate=consumerTemplate;
    this.endpointUri=endpointUri;
}

@Override
public Object read() throws Exception {
    Object item = consumerTemplate.receiveBody(endpointUri);
    return item;
}

}

“FTP 文件编写器”

import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.spring.batch.support.CamelItemWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class FtpFileWriter extends CamelItemWriter {
private Logger log= LoggerFactory.getLogger(this.getClass());
ProducerTemplate producerTemplate;
String endpointUri;
public FtpFileWriter(ProducerTemplate producerTemplate, String endpointUri) {
    super(producerTemplate, endpointUri);
    this.producerTemplate=producerTemplate;
    this.endpointUri=endpointUri;
}

@Override
public void write(List items) throws Exception {
    System.out.println("************************Writing item to ftp "+items);
    for (Object item : items) {
        System.out.println("writing item [{}]..."+item);
        producerTemplate.sendBody(endpointUri, item);
        log.debug("wrote item");
    }
}
}

如果我的本地目录中只有 5 个文件,它工作正常。它从我的本地目录中读取所有 5 个文件,然后将其发送给编写器,然后编写器将其作为我的 commit-interval=5 发送到 ftp 服务器。如果我在本地目录中有 6 个文件,那么它将 5 个文件的第一个块发送给 writer,然后它再次开始读取剩余的文件,这次只剩下一个文件。它读取 1 个文件并开始等待 4 个文件并且从不发送给写入器。我用 commit-interval=1 尝试过,现在它将所有 6 个文件发送到服务器并再次开始等待下一个文件。处理完所有文件后,我需要在这里停止该过程。

请帮我解决这个问题...

4

2 回答 2

1

FromConsumerTemplate的 javadocreceiveBody一直等到有响应;您需要使用超时(在 spring-batch 中检查 TimeoutPolicy)或以不同的方式将阅读器标记为“已用尽”(从阅读器返回 null)以阻止阅读器阅读

于 2013-07-31T06:17:18.170 回答
1

您可以使用receiveBodyNoWait而不是receiveBody。然后你必须检查消费者端点内是否还有文件。我为一个将 big-xml 文件消耗为较小部分的 tasklet 编写了此代码。

小任务:

public class MyCamelTasklet extends ServiceSupport implements Tasklet, InitializingBean{

private static final Logger LOG = LoggerFactory.getLogger(MyCamelTasklet.class);

private final CamelContext camelContext;
private final ConsumerTemplate consumerTemplate;
private final File workingDir;
private final Route xmlSplitRoute;


public MyCamelTasklet(ConsumerTemplate consumerTemplate) {
    super();        
    this.consumerTemplate = consumerTemplate;
    this.camelContext = consumerTemplate.getCamelContext();     
    this.xmlSplitRoute = this.camelContext.getRoutes().get(0);
    this.workingDir = new File(xmlSplitRoute.getRouteContext().getFrom().getUri().replace("file:", ""));
}

@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1)
        throws Exception {

    LOG.debug("reading new item...");

    Endpoint endpointXmlSplitRoute = xmlSplitRoute.getEndpoint();

    while(getNbFilesToConsume(this.workingDir) > 0) {       

     consumerTemplate.receiveBodyNoWait(endpointXmlSplitRoute);

    }       

    return RepeatStatus.FINISHED;
}

private int getNbFilesToConsume(File workingDir){
    return FileUtils.listFiles(workingDir, new String[]{"xml"}, false).size();
}

@Override
protected void doStart() throws Exception {
    ServiceHelper.startService(consumerTemplate);

}


@Override
protected void doStop() throws Exception {
    ServiceHelper.stopService(consumerTemplate);

}


@Override
public void afterPropertiesSet() throws Exception {
     ObjectHelper.notNull(camelContext, "CamelContext", this);         
     camelContext.addService(this);     
}
}

前面的 tasklet 的单元测试:

public class SplitTaskletTest {

@Test public void execute() throws Exception {
    CamelContext camelContext = new DefaultCamelContext();      

    camelContext.addRoutes(new RouteBuilder() {
        public void configure() {

            Namespaces ns = new Namespaces("nsl", "http://www.toto.fr/orders");
            from("file:data/inbox").id("inbox-road").
            split().
            xtokenize("//nsl:order", 'w', ns, 1000).
            streaming().
            to("file:data/outbox?fileName=${file:name.noext}-${exchangeId}.${file:ext}"); 

        }

    });

    camelContext.start();

    ConsumerTemplate consumer =new DefaultConsumerTemplate(camelContext);

    consumer.start();

    MyCamelTasklet tasklet = new MyCamelTasklet(consumer);

    long debutTraitement = System.currentTimeMillis();

    tasklet.execute(null, null);

    long finTraitement = System.currentTimeMillis();

    long total = finTraitement-debutTraitement;

    File outputDir = new File("data/outbox");
    outputDir.mkdir();

    int nbGeneratesFiles = FileUtils.listFiles(outputDir, new String[]{"xml"}, false).size();

    System.out.println("Traitement total en secondes : "+total/1000);

    Assert.assertTrue(nbGeneratesFiles>0);

}   
}
于 2015-10-27T13:50:33.127 回答