1
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
  <property name="targetObject" ref="fooService" />
  <property name="targetMethod" value="generateFoo" />
</bean>

鉴于这个简单的设置, fooService 是一个简单的 pojo spring bean。

如果 fooService 实现了 ItemStream 并且正确地实现了 open 和 update 方法,我的 ItemReader 是否可以重新启动?

问候

4

2 回答 2

4

好的,我终于回答了我自己的问题。

我发现 ItemReaderAdapter 非常有用,因为大多数时候我们已经有了某种 DAO 或服务来访问我们需要的数据。

但我的测试表明,如果我使用 ItemReaderAdapter impl。开箱即用,它们无法重新启动,因为它们没有实现 ItemStream!

因此,如果你们中的任何人想要使用具有可重新启动功能的 ItemReaderAdapter,这就是我的解决方案。

经过测试和工作;-)

1) 创建您自己的 ItemReaderAdapter 实现:

package xxx.readers.adapters;

import java.math.BigDecimal;

import org.apache.log4j.Logger;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator;
import org.springframework.beans.factory.annotation.Autowired;


/**
* Invokes a custom method on a delegate plain old Java object which itself
* provides an item.
* 
* overriden to implements the ItemStream interface
* 
* @author Benoit Campeau
*/
public class MyItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T>  implements ItemReader<T>, ItemStream {

private static final Logger log = Logger.getLogger(MyItemReaderAdapter.class);

private long currentCount = 0;

private final String CONTEXT_COUNT_KEY = "ReglementAdapter.count"; 

/**
 * @return return value of the target method.
 */
public T read() throws Exception {

    super.setArguments(new Long[]{currentCount++});
    return invokeDelegateMethod();
}
@Override
public void open(ExecutionContext executionContext)
        throws ItemStreamException {
    currentCount = executionContext.getLong(CONTEXT_COUNT_KEY,0);
    log.info("Open Stream current count : " + currentCount);

}


@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    executionContext.putLong(CONTEXT_COUNT_KEY, currentCount);
    log.info("Update Stream current count : " + currentCount);
}


@Override
public void close() throws ItemStreamException {
    // TODO Auto-generated method stub

}

}

2) 现在使用您的 impl 将 Reader 配置为 Adapter。(MyItemReaderAdapter)。

<bean id="MyReader" class="xxx.readers.adapters.MyItemReaderAdapter">
    <property name="targetObject" ref="someAdapter" />
    <property name="targetMethod" value="next" />       
</bean>

3) 最后,创建一个用作适配器委托类的组件:

package fcdq.iemt.batch.validation.reglement.readers.adapters;

import java.util.List;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component("someAdapter")
public class SomeAdapter {

private static final Logger log = Logger.getLogger(SomeAdapter .class);

@Autowired
private SomeService srv1;


private List<Transaction> listTrx;



public void init() {
    log.info("Initializing " + SomeAdapter.class.toString() );
    listTrx = srv1.findByTimestampAndStatus(context.getBeginTSCutoff(), context.getEndTSCutoff(), TransactionTypeEnum.TRANSFER_COMPLETE);
        }

/**
 * read method delegate  
 * @return
 */
public Transaction next(Long index) {
    if (listTrx != null && listTrx.size() > index ) {
        return listTrx.get(index.intValue());
    } else {
        return null;
    }

}

}

注意事项:

  1. 注意 MyItemReaderAdapter 中的 setArguments。必须将存储在 execution_context 中的 currentCount 的值传递给委托的 read() 方法

  2. 注意 MyItemReaderAdapter 没有实现 InitializingBean 接口。我使用 stepListener 代替,因为我想初始化我的项目列表以便及时读取。

希望它会帮助别人。

问候

于 2013-03-26T18:49:07.807 回答
0

另一种实现相同目的的方法是扩展AbstractItemCountingItemStreamItemReader

来自文档::

ItemReaders 的抽象超类,通过在 ExecutionContext 中存储项目计数来支持重新启动(因此需要在运行之间保留项目排序)。子类本质上不是线程安全的

示例代码:

package com.***.batch.reader;

import java.util.List;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.mivim.campaignmanager.data.model.custom.CustomCampaignSubscriberEmail;
import com.mivim.campaignmanager.service.CampaignSubscriberService;


@Component
public class CampaignSubscriberItemReader extends
        AbstractItemCountingItemStreamItemReader<CustomCampaignSubscriberEmail> {

    private Logger logger = LogManager.getLogger(CampaignSubscriberItemReader.class);

    @Autowired
    CampaignSubscriberService campaignSubscriberService;

    List<CustomCampaignSubscriberEmail> customCampaignSubscriberEmails;

    final String ecName = "csItemReaderContext";

    public CampaignSubscriberItemReader() {
        setName(ecName);
    }

    @Override
    protected CustomCampaignSubscriberEmail doRead() throws Exception {
        CustomCampaignSubscriberEmail customCampaignSubscriberEmail = customCampaignSubscriberEmails
                .get(getCurrentItemCount() - 1);
        return customCampaignSubscriberEmail;
    }

    @Override
    protected void doOpen() throws Exception {
        customCampaignSubscriberEmails = campaignSubscriberService.getPendingCampaignSubscriber();

        setMaxItemCount(customCampaignSubscriberEmails.size());

    }

    @Override
    protected void doClose() throws Exception {
        customCampaignSubscriberEmails.clear();
        setMaxItemCount(0);
        setCurrentItemCount(0);
    }


}
于 2016-07-19T05:50:18.573 回答