16

我正在编写一个春季批处理作业,在我的一个步骤中,我有以下处理器代码:

@Component
public class SubscriberProcessor implements ItemProcessor<NewsletterSubscriber, Account>, InitializingBean {

    @Autowired
    private AccountService service;

    @Override public Account process(NewsletterSubscriber item) throws Exception {
        if (!Strings.isNullOrEmpty(item.getId())) {
            return service.getAccount(item.getId());
        }
        // search with email address
        List<Account> accounts = service.findByEmail(item.getEmail());
        checkState(accounts.size() <= 1, "Found more than one account with email %s", item.getEmail());
        return accounts.isEmpty() ? null : accounts.get(0);
    }

    @Override public void afterPropertiesSet() throws Exception {
        Assert.notNull(service, "account service must be set");
    }
}

上面的代码有效,但我发现在某些极端情况下允许有多个Accountper 。NewsletterSubscriber所以我需要删除状态检查并将多个传递Account给项目编写者。

我发现的一种解决方案是同时更改ItemProcessorItemWriter处理List<Account>类型而不是,Account但这有两个缺点:

  • 由于编写器中的嵌套列表,代码和测试更难编写和维护
  • 最重要的是,同一事务中可能会写入多个Account对象,因为给 writer 的列表可能包含多个帐户,我想避免这种情况。

有什么办法,可能使用监听器,或者替换弹簧批处理使用的一些内部组件以避免处理器中的列表?

更新

我已经为这个问题在 spring Jira 上打开了一个问题。

我正在研究isCompletegetAdjustedOutputs方法,FaultTolerantChunkProcessor其中标记为扩展点,SimpleChunkProcessor看看我是否可以以某种方式使用它们来实现我的目标。

欢迎任何提示。

4

4 回答 4

16

项目处理器接受一件事,并返回一个列表

MyItemProcessor implements ItemProcessor<SingleThing,List<ExtractedThingFromSingleThing>> {
    public List<ExtractedThingFromSingleThing> process(SingleThing thing) {
    //parse and convert to list
    }
}

包裹下游作家以解决问题。这样,该作者下游的东西就不必与列表一起使用。

@StepScope
public class ItemListWriter<T> implements ItemWriter<List<T>> {
    private ItemWriter<T> wrapped;

    public ItemListWriter(ItemWriter<T> wrapped) {
        this.wrapped = wrapped;
    }

    @Override
    public void write(List<? extends List<T>> items) throws Exception {
        for (List<T> subList : items) {
            wrapped.write(subList);
        }
    }
}
于 2016-02-26T05:08:01.853 回答
5

没有办法在每次调用ItemProcessorSpring Batch 时返回一个以上的项目而不深入杂草。如果你真的想知道 anItemProcessorItemWriterexit 之间的关系在哪里(不推荐),请查看ChunkProcessor接口的实现。虽然简单的案例 ( SimpleChunkProcessor) 并没有那么糟糕,但如果您使用任何容错逻辑(通过 跳过/重试FaultTolerantChunkProcessor),它会变得非常快速。

一个更简单的选择是将此逻辑移动到ItemReader在返回项目之前执行此丰富的逻辑。ItemReader在返回项目之前将您正在使用的任何内容包装在ItemReader执行服务查找的自定义实现中。在这种情况下,NewsletterSubscriber您不是从阅读器返回 a ,而是Account根据先前的信息返回 an 。

于 2014-06-03T00:13:22.060 回答
1

您返回的不是返回帐户,而是创建一个 AccountWrapper 或 Collection。作家显然必须考虑到这一点:)

于 2015-01-23T11:27:11.603 回答
-2

您可以通过制作以下代码使转换器将您的 Pojo(Pojo 对象从文件)转换为您的实体:

public class Intializer {

public static LGInfo initializeEntity() throws Exception {
    Constructor<LGInfo> constr1 = LGInfo.class.getConstructor();
    LGInfo info = constr1.newInstance();
    return info;
}
}

并在您的项目处理器中

public class LgItemProcessor<LgBulkLine, LGInfo> implements ItemProcessor<LgBulkLine, LGInfo> {

private static final Log log = LogFactory.getLog(LgItemProcessor.class);

@SuppressWarnings("unchecked")
@Override
public LGInfo process(LgBulkLine item) throws Exception {
    log.info(item);
    return (LGInfo) Intializer.initializeEntity();
}

}
于 2019-06-26T10:39:48.260 回答