4

我的问题陈述。读取包含 1000 万条数据的 csv 文件并将其存储在 db 中。用尽可能少的时间。

我使用java的简单多线程执行器实现了它,逻辑几乎类似于spring批处理的块。从 csv 文件中读取预先配置的数据数量,然后创建一个线程,并将数据传递给验证数据的线程,然后写入在多线程中运行的文件。完成所有任务后,我将调用 sql loader 来加载每个文件。现在我想将此代码移动到春季批次(我是春季批次的新手)

这是我的问题
1. 在任务中,是否可以使 ItemReader 到 Item writer 多线程(当我读取文件时,在线程写入数据之前创建一个新线程来处理数据)?如果不是,我需要创建两个步骤,第一步读取单线程文件,另一步多线程写入单个文件,但是如何将数据列表从前一个任务传递给另一个任务。
2. 如果单线程出现故障,如何停止整个批处理作业处理。
3.如何在一定间隔后重试批处理作业,以防失败。我知道在失败的情况下有重试选项,但我找不到在失败的情况下在一定间隔后重试任务的选项。在这里我不是在谈论调度程序,因为我已经在调度程序下运行了批处理作业,但是如果失败,它必须在 3 分钟后重新运行。

4

3 回答 3

2

这是我解决问题的方法。

  1. Read a file and chunk the file( split the file) using Buffered and File Channel reader and writer ( the fastest way of File read/write, even spring batch uses the same). I implemented such that this is executed before job is started( However it can be executed using job as step using method invoker)

  2. Start the Job with directory location as job parameter.

  3. Use multiResourcePartitioner which will get the directory location and for each file a slave step is created in separate thread
  4. In the Slave step get the file passed from Partitioner and use spring batchs itemreader to read the file
  5. Use the Database item writer( I'm using mybatis batch itemwriter) to push the data to Database.
  6. Its better to use the split count equal to commit-count of step.
于 2013-10-06T15:51:21.500 回答
1
  1. 关于多线程阅读Spring Batch 中如何设置多线程?回答; 它会为您指明正确的方向。此外,在示例中,还有一些关于重新启动 CSV 文件的注意事项
  2. 如果线程出现错误,作业应该自动失败:我从未尝试过,但这应该是默认行为
  3. Spring Batch 如何在一个 Chunk tasklet 中设置每次调用之间的时间间隔可以作为一个开始。此外,关于Backoff Policies的官方文档- 在短暂失败后重试时,通常在重试之前稍等片刻会有所帮助,因为通常失败是由一些只能通过等待才能解决的问题引起的。如果 RetryCallback 失败,RetryTemplate 可以根据到位的 BackoffPolicy 暂停执行。

让我知道这是否有帮助或您如何解决问题,因为我对我的(未来)工作感兴趣!
我希望我的指示能有所帮助。

于 2013-09-07T21:05:59.750 回答
-1

You can split your input file to many file , the use Partitionner and load small files with threads, but on error , you must restart all job after DB cleaned.

<batch:job id="transformJob">
    <batch:step id="deleteDir" next="cleanDB">
        <batch:tasklet ref="fileDeletingTasklet" />
    </batch:step>
    <batch:step id="cleanDB" next="split">
        <batch:tasklet ref="countThreadTasklet" />
    </batch:step>
    <batch:step id="split" next="partitionerMasterImporter">
        <batch:tasklet>
            <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" commit-interval="#{jobExecutionContext['chunk.count']}" />
        </batch:tasklet>
    </batch:step>
    <batch:step id="partitionerMasterImporter" next="partitionerMasterExporter">
        <partition step="importChunked" partitioner="filePartitioner">
            <handler grid-size="10" task-executor="taskExecutor" />
        </partition>
    </batch:step>
</batch:job>

Full example code (on Github).

Hope this help.

于 2017-10-05T14:33:57.540 回答