30

在 Spring 批处理中,我需要将 ItemReader 读取的项目传递给两个不同的处理器和编写器。我想要达到的是...

                        +---> ItemProcessor#1 ---> ItemWriter#1
                        |
ItemReader ---> 项目 ---+
                        |
                        +---> ItemProcessor#2 ---> ItemWriter#2

这是必需的,因为与 ItemWriter#2 编写的项目相比,由 ItemWriter#1 编写的项目应该以完全不同的方式处理。此外,ItemReader 从数据库中读取项目,它执行的查询计算量太大,以至于执行两次相同的查询应该被丢弃。

关于如何实现这种设置的任何提示?或者,至少,一个逻辑上等效的设置?

4

5 回答 5

14

如果您的项目应由处理器 #1 和处理器 #2 处理,则此解决方案有效

您必须使用此签名创建一个处理器#0:

class Processor0<Item, CompositeResultBean>

CompositeResultBeanbean 定义在哪里

class CompositeResultBean {
  Processor1ResultBean result1;
  Processor2ResultBean result2;
}

在您的处理器#0 中,只需将工作委托给处理器#1 和#2 并将结果放入CompositeResultBean

CompositeResultBean Processor0.process(Item item) {
  final CompositeResultBean r = new CompositeResultBean();
  r.setResult1(processor1.process(item));
  r.setResult2(processor2.process(item));
  return r;
}

您自己的作家是作家的CompositeItemWriter代表CompositeResultBean.result1CompositeResultBean.result2(查看PropertyExtractingDelegatingItemWriter,也许可以提供帮助)

于 2013-09-25T09:38:46.357 回答
6

我按照 Luca 的建议PropertyExtractingDelegatingItemWriter作为作家使用,并且我能够在一个步骤中与两个不同的实体一起工作。

首先我所做的是定义一个 DTO 来存储来自处理器的两个实体/结果

public class DatabaseEntry {
    private AccessLogEntry accessLogEntry;
    private BlockedIp blockedIp;

    public AccessLogEntry getAccessLogEntry() {
        return accessLogEntry;
    }

    public void setAccessLogEntry(AccessLogEntry accessLogEntry) {
        this.accessLogEntry = accessLogEntry;
    }

    public BlockedIp getBlockedIp() {
        return blockedIp;
    }

    public void setBlockedIp(BlockedIp blockedIp) {
        this.blockedIp = blockedIp;
    }
}

然后我将此 DTO 传递给 writer,在PropertyExtractingDelegatingItemWriter该类中我定义了两个自定义方法将实体写入数据库,请参见下面的 writer 代码:

@Configuration
public class LogWriter extends LogAbstract {
    @Autowired
    private DataSource dataSource;

    @Bean()
    public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {
        PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();
        propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});
        propertyExtractingDelegatingItemWriter.setTargetObject(this);
        propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");
        return propertyExtractingDelegatingItemWriter;
    }

    public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {
        writeAccessLogTable(accessLogEntry);
        if (blockedIp != null) {
            writeBlockedIp(blockedIp);
        }

    }

    private void writeBlockedIp(BlockedIp entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");
        statement.setString(1, entry.getIp());
        statement.setInt(2, threshold);
        statement.setTimestamp(3, Timestamp.valueOf(startDate));
        statement.setTimestamp(4, Timestamp.valueOf(endDate));
        statement.setString(5, entry.getComment());
        statement.execute();
    }

    private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");
        statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));
        statement.setString(2, entry.getIp());
        statement.setString(3, entry.getRequest());
        statement.setString(4, entry.getStatus());
        statement.setString(5, entry.getUserAgent());
        statement.execute();
    }
}

使用这种方法,您可以从单个阅读器获得所需的初始行为,以处理多个实体并在一个步骤中保存它们。

于 2017-11-05T00:14:14.247 回答
3

你可以使用一个CompositeItemProcessorCompositeItemWriter

它看起来与您的架构不完全相同,它将是顺序的,但它会完成这项工作。

于 2013-09-25T09:34:49.230 回答
0

这是我想出的解决方案。

因此,我们的想法是编写一个“包含” ItemProcessor 和 ItemWriter 的新 Writer。只是为了给你一个想法,我们称之为 PreprocessoWriter,这就是核心代码。

private ItemWriter<O> writer;
private ItemProcessor<I, O> processor;

@Override
public void write(List<? extends I> items) throws Exception {
    List<O> toWrite = new ArrayList<O>();
    for (I item : items) {
        toWrite.add(processor.process(item));
    }
    writer.write(toWrite);
}

有很多东西被搁置一旁。例如,ItemStream 的管理。但在我们的特定场景中,这已经足够了。

因此,您可以将多个 PreprocessorWriter 与 CompositeWriter 结合起来。

于 2014-01-24T15:40:31.313 回答
0

如果您有合理数量的项目(例如少于 1 个 Go),还有另一种解决方案:您可以将选择的结果缓存到包装在 Spring bean 中的集合中。

然后你可以免费阅读两次收藏。

于 2015-04-24T07:52:34.830 回答