问题标签 [itemprocessor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
450 浏览

spring-batch - Spring批处理概念需要澄清

我是 Spring Batch 的新手,我在使用 Spring Batch 实现我的业务用例时遇到了问题。

基本上,我正在从数据库中读取数据,即时事通讯的订阅者列表。然后,我需要向每个订阅者发送一封电子邮件,并将数据插入到数据库中,以便知道电子邮件发送给了哪个订阅者。

我使用了一个 ItemProcessor 实现,它的 process 方法返回一个 MimeMessage 并将订阅者作为参数;与此处理器关联的编写器的类型为:org.springframework.batch.item.mail.javamail.MimeMessageItemWriter

问题是我需要另一个用于数据库插入的编写器(可能使用 CompositeItemWriter),它将订阅者列表作为参数,我所拥有的所有输入都是来自上述 ItemProcessor 的 MimeMessage。

有人可以帮忙吗?

0 投票
5 回答
37875 浏览

spring - Spring Batch:一个阅读器,多个处理器和编写器

在 Spring 批处理中,我需要将 ItemReader 读取的项目传递给两个不同的处理器和编写器。我想要达到的是...

这是必需的,因为与 ItemWriter#2 编写的项目相比,由 ItemWriter#1 编写的项目应该以完全不同的方式处理。此外,ItemReader 从数据库中读取项目,它执行的查询计算量太大,以至于执行两次相同的查询应该被丢弃。

关于如何实现这种设置的任何提示?或者,至少,一个逻辑上等效的设置?

0 投票
4 回答
24340 浏览

java - 从春季批处理 ItemProcessor 返回多个项目

我正在编写一个春季批处理作业,在我的一个步骤中,我有以下处理器代码:

上面的代码有效,但我发现在某些极端情况下允许有多个Accountper 。NewsletterSubscriber所以我需要删除状态检查并将多个传递Account给项目编写者。

我发现的一种解决方案是同时更改ItemProcessorItemWriter处理List<Account>类型而不是,Account但这有两个缺点:

  • 由于编写器中的嵌套列表,代码和测试更难编写和维护
  • 最重要的是,同一事务中可能会写入多个Account对象,因为给 writer 的列表可能包含多个帐户,我想避免这种情况。

有什么办法,可能使用监听器,或者替换弹簧批处理使用的一些内部组件以避免处理器中的列表?

更新

我已经为这个问题在 spring Jira 上打开了一个问题。

我正在研究isCompletegetAdjustedOutputs方法,FaultTolerantChunkProcessor其中标记为扩展点,SimpleChunkProcessor看看我是否可以以某种方式使用它们来实现我的目标。

欢迎任何提示。

0 投票
1 回答
877 浏览

java - Spring Batch:使用 AsyncItemProcessor 的多线程步骤不会并行运行

TL;博士

给定一个包含一百万个记录的文件,其中在文件的每一行上要执行大量逻辑,读取文件并在每一行上完成应用逻辑的最快方法是什么。我使用了文件阅读器的多线程步骤,其read方法是synchronized读取文件,并且还使用了一个AsynItemProcessor以便在其自己的线程中处理记录。

我的期望是,AsynItemProcessor一旦有读者的记录要处理,就应该立即开始。每条记录都应该在自己的线程中处理;但是,在我下面的示例中似乎并非如此


我的 Spring 批处理作业中有一个步骤,它使用TaskExecutor20 个线程和 10000 的提交间隔来读取文件。我也使用AsycnItemProcessorandAsyncItemWriter因为数据处理有时会比从文件中读取一行所需的时间长。

在哪里 :

  1. fileReader是一个扩展的类,FlatFileItemReader方法readsynchronized并且只是在其中调用super.read
  2. asyncProcessor只不过是一个AsyncItemProcessorbean,它从文件中传递每一行并通过一个键对其进行分组,并将其存储到一个包含Map<String,BigDecimal>对象的单例 bean 中。换句话说,处理器只是简单地将文件数据按几列分组并将这些数据存储在内存中。
  3. asyncWriter只不过是在AsyncItemWriter其中包装了 no 操作的 an ItemWriter。换句话说,该作业不需要进行任何类型的写入,因为处理器本身正在执行聚合并将数据存储在内存中。( Map)。
  4. 请注意, theAsynItemProcessor有它自己的ThreadPoolTaskExecutorcorePoolSize=10andmaxPoolSize=20和 theStep有它自己的ThreadPoolTaskExecutorwith a corePoolSize=20andmaxPoolSize=40

通过上述设置,我的预期是读取和处理将并行发生。就像是 :

  1. FileReader 从文件中读取一条记录并将其传递给处理器
  2. AsyncItemProcessor执行聚合。既然是AsyncItemProcessor,那么调用该process方法的线程理想情况下应该可以自由地做其他工作吗?
  3. 最后,AsynItemWriter将获取Future并提取数据但什么也不做,因为委托是无操作的ItemWriter

但是当我添加一些日志时,我没有看到我所期望的:

2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 2500 records 2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 5000 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 7501 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 10000 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 12500 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 15000 records

... 更多这样的行被打印,直到整个文件被读取。只有在读取文件后,我才开始看到处理器开始工作:

2019-09-07 10:06:53 INFO FileProcessor:83 - Finished processing 2500 records 2019-09-07 10:08:28 INFO FileProcessor:83 - Finished processing 5000 records 2019-09-07 10:10:04 INFO FileProcessor:83 - Finished processing 7500 records 2019-09-07 10:11:40 INFO FileProcessor:83 - Finished processing 10000 records 2019-09-07 10:13:16 INFO FileProcessor:83 - Finished processing 12500 records 2019-09-07 10:14:51 INFO FileProcessor:83 - Finished processing 15000 records

底线:为什么在文件完全读取之前处理器不启动?无论ThreadPoolTaskExecutor用于AsynItemProcessor或 整个的参数是什么step,读取总是首先完成,然后才开始处理。

0 投票
1 回答
160 浏览

spring - Spring Batch 分区不起作用复合项目处理器

我有一个 Spring Batch 分区作业。我正在使用CompositeProcessor,从数据库中读取数据并将这些项目保存到CopyOnWriteArrayList. 因为环境是并发的,但我CopyOnWriteArrayList被用于其他线程和混合信息,我不知道为什么以及我做错了什么,以及将它们写入每个线程的文件的输出。

我的批处理 im XML 的配置:

0 投票
0 回答
142 浏览

java - 在批处理作业中的 Spring Batch Processor 中生成新项目/CSV-Rows

摘要:我需要导出一些带有在批量导出期间生成的行的 CSV 文件。

我已经有一个 JdbcItemreader 可以从我的数据库中获取数据。处理器将 OutputData 格式化为所需的格式。我确实有一个编写 CSV 文件的基本 FlatFileItemWriter

这就是我导出的 CSV 行的样子:

我知道这些行看起来很难看,但这是其他 CSV 导入器的要求。现在我需要计算“总和线”并将它们添加到具有相同 ID 的两行之前。计算出的“总和线”应如下所示:

所以最终的输出应该是这样的:

这是我的问题:

  • 是否可以通过 FlatFileItemWriter 在 CSV 文件中的其他行之间添加行?还是我应该在写文件之前生成总和行?
  • 是否有可能访问 ItemWriter / Processor 中的完整数据块?这样我就可以在将新行写入文件之前将它们添加到块中吗?
0 投票
2 回答
296 浏览

spring - 在春季批处理中分离步骤类

我试图找到解决方案,但我不能...... ㅠㅠ

我想在下面的工作中分离步骤。

我之所以如此分裂,是因为我必须在每一步都使用查询。

我的目的是我必须使用step1,step2中的返回数据。但是 jpaItemReader 就像 async ...所以它不会像上面的顺序那样处理。
像这样的调试流程。

这对我来说是个大问题......我
如何才能等待工作中的每一步?包括查询。

0 投票
1 回答
328 浏览

spring-boot - 尝试在春季批处理 ItemProcessor 中读取数据库查询时出现 NullPointerException

启动 ApplicationContext 时出错。要显示条件报告,请在启用“调试”的情况下重新运行您的应用程序。2020-May-05 22:30:13.654 错误 [main] osbSpringApplication - 应用程序运行失败 org.springframework.beans.factory.UnsatisfiedDependencyException:创建名称为“transactionHistoryController”的 bean 时出错:通过字段“transactionHistoryCsvImportJob”表示的依赖关系不满足;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“transactionHistoryJob”的 bean 创建错误:通过工厂方法进行 Bean 实例化失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.core.Job]:工厂方法“transactionHistoryJob”抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“transactionHistoryStep”的 bean 创建错误:通过工厂方法实例化 bean 失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.core.Step]:工厂方法“transactionHistoryStep”抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“处理器”的 bean 创建错误:通过工厂方法进行的 Bean 实例化失败;嵌套异常是 org.springframework.beans。BeanInstantiationException:无法实例化 [org.springframework.batch.item.ItemProcessor]:工厂方法“处理器”抛出异常;嵌套异常是 org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java: 90) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1411) 在 org.springframework.beans 的 org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessProperties(AutowiredAnnotationBeanPostProcessor.java:374) .factory.support.AbstractAutowireCapableBeanFactory。org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“transactionHistoryJob”的 bean 创建错误:通过工厂方法进行 bean 实例化失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.core.Job]:工厂方法“transactionHistoryJob”抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“transactionHistoryStep”的 bean 创建错误:通过工厂方法实例化 bean 失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch。core.Step]:工厂方法“transactionHistoryStep”抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“处理器”的 bean 创建错误:通过工厂方法进行的 Bean 实例化失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.item.ItemProcessor]:工厂方法“处理器”抛出异常;嵌套异常是 org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456) 的 org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627) 的 java.lang.NullPointerException在 org.springframework。springframework.batch.core.Job]:工厂方法“transactionHistoryJob”抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“transactionHistoryStep”的 bean 创建错误:通过工厂方法进行 Bean 实例化失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.core.Step]:工厂方法“transactionHistoryStep”抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“处理器”的 bean 创建错误:通过工厂方法进行的 bean 实例化失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.item.ItemProcessor]:工厂方法“处理器”抛出异常;嵌套异常是 org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) 处的 java.lang.NullPointerException org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622) ...省略了 32 个常见框架 原因:org.springframework.beans.factory.BeanCreationException:创建类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义的名称为“transactionHistoryStep”的 bean 时出错:通过工厂方法实例化 bean失败的; 嵌套异常是 org.springframework.beans。BeanInstantiationException:无法实例化[org.springframework.batch.core.Step]:工厂方法'transactionHistoryStep'抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“处理器”的 bean 创建错误:通过工厂方法进行的 bean 实例化失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.item.ItemProcessor]:工厂方法“处理器”抛出异常;嵌套异常是 org.springframework.beans.factory.support.ConstructorResolver 的 org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627) 的 java.lang.NullPointerException。beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ...省略了33个常见框架原因:org.springframework.beans.BeanInstantiationException:无法实例化[org.springframework.batch.core.Step]:工厂方法“transactionHistoryStep”抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“处理器”的 bean 创建错误:通过工厂方法进行的 bean 实例化失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.item.ItemProcessor]:工厂方法“处理器”抛出异常;嵌套异常是 org 的 java.lang.NullPointerException。springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622) ...省略了56个常见框架原因:org .springframework.beans.factory.BeanCreationException:在类路径资源 [com/datalabsindia/batch/TransactionHistoryCsvImport.class] 中定义名称为“处理器”的 bean 创建错误:通过工厂方法实例化 bean 失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.item.ItemProcessor]:工厂方法“处理器”抛出异常;嵌套异常是 org.springframework.beans.factory.support.ConstructorResolver 中的 java.lang.NullPointerException。invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ... 省略了 57 个常见框架原因: org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.item.ItemProcessor]:工厂方法“处理器”抛出异常;嵌套异常是 org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) 处的 java.lang.NullPointerException org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622) ...省略了 80 个常见帧原因:java.lang.NullPointerException:com.datalabsindia.batch 处为空。

0 投票
1 回答
385 浏览

spring-boot - 为什么 Spring Batch AsycItemProcessor 中的异常被 SkipListener 的 onSkipInWrite 方法捕获?

我正在编写一个 Spring Boot 应用程序,它启动、收集数百万个数据库条目并将其转换为新的简化 JSON 格式,然后将它们全部发送到 GCP PubSub 主题。我正在尝试为此使用 Spring Batch,但是在为我的流程实现容错时遇到了麻烦。数据库中充斥着数据质量问题,有时我的 JSON 转换会失败。发生故障时,我不希望工作立即退出,我希望它继续处理尽可能多的记录,并在完成之前报告哪些确切记录失败,以便我和/或我的团队可以检查这些记录有问题的数据库条目。

为此,我尝试使用 Spring Batch 的 SkipListener 接口。但我也在我的进程中使用了 AsyncItemProcessor 和 AsyncItemWriter,即使在处理过程中发生异常,SkipListener 的onSkipInWrite()方法也会捕获它们 - 而不是onSkipInProcess()方法。不幸的是,该onSkipInWrite()方法无法访问原始数据库实体,因此我无法将其 ID 存储在有问题的数据库条目列表中。

我是否配置错误?是否有任何其他方法可以从未能通过 AsynItemProcessor 处理步骤的读取器访问对象?

这是我尝试过的...

我有一个单例 Spring 组件,我在其中存储了已成功处理的数据库条目数以及多达 20 个有问题的数据库条目。

我有一个 Spring 批处理 Skip Listener,它应该捕获故障并相应地更新我的状态组件:

然后我像这样配置了我的工作:

0 投票
0 回答
351 浏览

mongodb - Spring Batch + MongoItemReader、ItemProcessor、MongoItemWriter + 不读取所有记录

我正在将 Spring Batch 与 Mongo 数据库一起使用。

我需要根据状态(status= PENDING)获取文档,写入 Kafka 队列并使用新状态(status= FILLED)更新文档字段。

所以我使用MongoItemReader, CompositeItemWriter( KafkaItemWriter, MongoItemWriter) 写入 Kafka 队列并更新它。但是当我运行作业时,我可以看到一些文档被跳过,并且跳过的文档数量等于块大小。

例如,我的集合有 15 个文档,块大小等于 5,MongoItemReader读取第 1、2、3、4、5 行,然后读取第 11、12、13、14、15 行(跳过第 6、7、8、9 行,10)。

ItemProcessor对 POJO 实体进行修改。由于 MongoItemReader 在每次读取之间进行了刷新,因此更新了实体。但似乎游标分页也增加了(从日志中可以看出:行 ID 6、7、8、9 和 10 已被跳过)。我已经看到它通过使用“ SqlPagingQueryProviderFactoryBean ”在关系数据库中进行了工作,但它不适合 NoSQL DB。我已尽一切努力寻找解决方案,但没有任何帮助。

那么,我该如何处理这个问题呢?