1

我正在编写一个 Spring Boot 应用程序,它启动、收集数百万个数据库条目并将其转换为新的简化 JSON 格式,然后将它们全部发送到 GCP PubSub 主题。我正在尝试为此使用 Spring Batch,但是在为我的流程实现容错时遇到了麻烦。数据库中充斥着数据质量问题,有时我的 JSON 转换会失败。发生故障时,我不希望工作立即退出,我希望它继续处理尽可能多的记录,并在完成之前报告哪些确切记录失败,以便我和/或我的团队可以检查这些记录有问题的数据库条目。

为此,我尝试使用 Spring Batch 的 SkipListener 接口。但我也在我的进程中使用了 AsyncItemProcessor 和 AsyncItemWriter,即使在处理过程中发生异常,SkipListener 的onSkipInWrite()方法也会捕获它们 - 而不是onSkipInProcess()方法。不幸的是,该onSkipInWrite()方法无法访问原始数据库实体,因此我无法将其 ID 存储在有问题的数据库条目列表中。

我是否配置错误?是否有任何其他方法可以从未能通过 AsynItemProcessor 处理步骤的读取器访问对象?

这是我尝试过的...

我有一个单例 Spring 组件,我在其中存储了已成功处理的数据库条目数以及多达 20 个有问题的数据库条目。

@Component
@Getter //lombok
public class ProcessStatus {
    private int processed;
    private int failureCount;
    private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();

    public void incrementProgress { processed++; }
    public void logUnexpectedFailure(UnexpectedFailure failure) {
        failureCount++;
        unexpectedFailure.add(failure);
    }

    @Getter
    @AllArgsConstructor
    public static class UnexpectedFailure {
        private Throwable error;
        private DBProjection dbData;
    }
}

我有一个 Spring 批处理 Skip Listener,它应该捕获故障并相应地更新我的状态组件:

@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
    private ProcessStatus processStatus;

    @Override
    public void onSkipInRead(Throwable error) {}

    @Override
    public void onSkipInProcess(DBProjection dbData, Throwable error) {
        processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
    }

    @Override
    public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
        //This is getting called instead!! Even though the exception happened during processing :(
        //But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
    }
}

然后我像这样配置了我的工作:

@Configuration
public class ConversionBatchJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private TaskExecutor processThreadPool;

    @Bean
    public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
        return new SimpleCompletionPolicy(chunkSize);
    }

    @Bean
    @StepScope
    public ItemStreamReader<DbProjection> dbReader(
            MyDomainRepository myDomainRepository,
            @Value("#{jobParameters[pageSize]}") Integer pageSize,
            @Value("#{jobParameters[limit]}") Integer limit) {
        RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
        myDomainRepositoryReader.setRepository(myDomainRepository);
        myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
        myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
            add("ACTIVE");
        }});
        myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
            put("update_date", Sort.Direction.ASC);
        }});
        myDomainRepositoryReader.setPageSize(pageSize);
        myDomainRepositoryReader.setMaxItemCount(limit);
        // myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
        return myDomainRepositoryReader;
    }

    @Bean
    @StepScope
    public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
        //Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
        return new DbProjectionToJsonMessageConverter(dataRetrievalService);
    }

    @Bean
    @StepScope
    public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
            ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
        AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
        asyncDataConverter.setDelegate(dataConverter);
        asyncDataConverter.setTaskExecutor(processThreadPool);
        asyncDataConverter.afterPropertiesSet();
        return asyncDataConverter;
    }

    @Bean
    @StepScope
    public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
        return new JsonMessageWriter(publisherService);
    }

    @Bean
    @StepScope
    public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
        AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
        asyncJsonPublisher.setDelegate(jsonPublisher);
        asyncJsonPublisher.afterPropertiesSet();
        return asyncJsonPublisher;
    }

    @Bean
    public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
                                  ItemStreamReader<DbProjection> dbReader,
                                  AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
                                  AsyncItemWriter<JsonMessage> asyncJsonPublisher,
                                  ProcessStatus processStatus,
                                  @Value("${conversion.failure.limit:20}") int maximumFailures) {
        return stepBuilderFactory.get("conversionProcess")
                .<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
                .reader(dbReader)
                .processor(asyncDataConverter)
                .writer(asyncJsonPublisher)
                .faultTolerant()
                .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
                            //  ^ for now this returns true for everything until 20 failures
                    .listener(new ConversionSkipListener(processStatus))
                .build();
    }

    @Bean
    public Job conversionJob(Step conversionProcess) {
        return jobBuilderFactory.get("conversionJob")
                .start(conversionProcess)
                .build();
    }
}
4

1 回答 1

2

这是因为由 包裹的futureAsyncItemProcessor只在 中解包AsyncItemWriter,所以当时可能发生的任何异常都被视为写异常而不是处理异常。这就是为什么onSkipInWrite被称为而不是onSkipInProcess.

这实际上是该模式的一个已知限制,记录在AsyncItemProcessor的 Javadoc 中,这是一个摘录:

Because the Future is typically unwrapped in the ItemWriter,
there are lifecycle and stats limitations (since the framework doesn't know 
what the result of the processor is).

While not an exhaustive list, things like StepExecution.filterCount will not
reflect the number of filtered items and 
itemProcessListener.onProcessError(Object, Exception) will not be called.

Javadoc 指出该列表并不详尽,您所遇到的副作用SkipListener就是这些限制之一。

于 2020-05-29T11:12:36.117 回答