1

我有一个 Spring Batch 作业,它从 S3 存储桶中读取一堆文件,处理它们,然后将其发送到数据库,在多线程配置中执行此操作。该application.properties文件包含以下内容:

cloud.aws.credentials.accessKey=accessKey 
cloud.aws.credentials.secretKey=secret
cloud.aws.region.static=us-east-1
cloud.aws.credentials.instanceProfile=true 
cloud.aws.stack.auto=false

我的物品阅读器:

@Bean
ItemReader<DataRecord> itemReader() {
    FlatFileItemReader<DataRecord> flatFileItemReader = new FlatFileItemReader<>();
    flatFileItemReader.setLinesToSkip(0);
    flatFileItemReader.setLineMapper(new DataRecord.DataRecordLineMapper());
    flatFileItemReader.setSaveState(false);

    MultiResourceItemReader<DataRecord> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setDelegate(flatFileItemReader);
    multiResourceItemReader.setResources(loadS3Resources(null, null));
    multiResourceItemReader.setSaveState(false);

    SynchronizedItemStreamReader<DataRecord> itemStreamReader = new SynchronizedItemStreamReader<>();
    itemStreamReader.setDelegate(multiResourceItemReader);
    return itemStreamReader;
}

还有我的任务执行器:

@Bean
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
    return threadPoolTaskExecutor;
}

作业只包含一个步骤,它从文件中读取、处理它们,然后写入数据库。在这种配置下,资源被加载,Job 启动并且该步骤对第一个 Resource 的 ~240k 第一行进行处理(有 7 个资源,每个资源有 1.2M 行)。然后我得到以下异常:

org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [Amazon s3 resource [bucket='my-bucket' and object='output/part-r-00000']]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.read(MultiResourceItemReader.java:108) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:55) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.3.9.RELEASE.jar!/:4.3.9.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_65]
Caused by: javax.net.ssl.SSLException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:596) ~[na:1.8.0_65]
at sun.security.ssl.InputRecord.read(InputRecord.java:532) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) ~[na:1.8.0_65]
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) ~[na:1.8.0_65]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) ~[httpclient-4.5.3.jar!/:4.5.3]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:117) ~[aws-java-sdk-s3-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[na:1.8.0_65]
at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[na:1.8.0_65]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_65]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
... 23 common frames omitted

我想知道是否有一个简单的方法来解决这个问题。目前我正在考虑只制作文件的本地副本,然后从中读取,但我想知道是否可以通过某些配置来避免此异常。

谢谢!

4

1 回答 1

1

我的猜测是一个线程关闭 SFTP 会话,而另一个线程仍在处理中。

最好使用MultiResourcePartitioner为每个资源(文件)创建一个分区,然后让阅读器将每个文件作为自己的分区单独拾取。使用该配置,您也不再需要MultiResourceItemReader(您可以直接访问委托)。

请参阅此处的示例 https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/resources/jobs/partitionFileJob.xml

另请参阅 如何为 MultiResourceItemReader 应用分区计数?

于 2017-08-02T09:09:36.313 回答