4

在批处理作业步骤配置中,我计划在 writer 中执行 2 个查询,第一个查询是更新表 A 中的记录,然后第二个查询是再次在表 A 中插入新记录。

到目前为止,我认为 CompositeItemWriter 可以实现我上面的目标,即我需要创建2个JdbcBatchItemWriters,一个用于更新,另一个用于插入。

我的第一个问题是 CompositeItemWriter 是否适合上述要求?

如果是,那就引出关于交易的第二个问题。例如,如果第一次更新成功,第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动提取两个更新?

提前致谢!

4

1 回答 1

6

我的第一个问题是 CompositeItemWriter 是否适合上述要求?

是的,CompositeItemWriter是要走的路。

如果是,那就引出关于交易的第二个问题。例如,如果第一次更新成功,第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动提取两个更新?

好问题!是的,如果在第一个写入器中更新成功,然后在第二个写入器中插入失败,则所有语句将自动回滚。您需要知道的是,事务围绕面向块的 tasklet 步骤的执行(以及围绕write复合项目编写器的方法)。因此,此方法中所有 sql 语句的执行(在委托编写器中执行)将是原子的。

为了说明这个用例,我编写了以下测试:

  • 给定一个people包含两列idname其中只有一条记录的表:1,'foo'
  • 让我们想象一个读取两条记录 ( 1,'foo', 2,'bar') 并尝试更新然后插入foo到表中的作业。这是通过两个项目编写器完成的:和foo!!2,'bar'CompositeItemWriterUpdateItemWriterInsertItemWriter
  • 用例是UpdateItemWriter成功但InsertItemWriter失败(通过抛出异常)
  • 预期的结果是foo没有更新到foo!!bar没有插入到表中(两条sql语句都回滚了,因为出现了异常InsertItemWriter

这是代码(它是独立的,因此您可以尝试一下,看看它是如何工作的,它使用了一个应该在您的类路径中的嵌入式 hsqldb 数据库):

import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Before
    public void setUp() {
        jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
        jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
    }

    @Test
    public void testTransactionRollbackWithCompositeWriter() throws Exception {
        // given
        int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount);
        Assert.assertEquals(1, fooCount);
        Assert.assertEquals(0, barCount);

        // when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();

        // then
        Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
        Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        Assert.assertEquals(0, stepExecution.getCommitCount());
        Assert.assertEquals(1, stepExecution.getRollbackCount());
        Assert.assertEquals(0, stepExecution.getWriteCount());

        peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount); // bar is not inserted
        Assert.assertEquals(0, barCount); // bar is not inserted
        Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
    }

    @Configuration
    @EnableBatchProcessing
    public static class JobConfiguration {

        @Bean
        public DataSource dataSource() {
            return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.HSQL)
                    .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                    .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                    .build();
        }

        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }

        @Bean
        public ItemReader<Person> itemReader() {
            Person foo = new Person(1, "foo");
            Person bar = new Person(2, "bar");
            return new ListItemReader<>(Arrays.asList(foo, bar));
        }

        @Bean
        public ItemWriter<Person> updateItemWriter() {
            return new UpdateItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> insertItemWriter() {
            return new InsertItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> itemWriter() {
            CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
            return compositeItemWriter;
        }

        @Bean
        public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            return jobBuilderFactory.get("job")
                    .start(stepBuilderFactory
                            .get("step").<Person, Person>chunk(2)
                            .reader(itemReader())
                            .writer(itemWriter())
                            .build())
                    .build();
        }

        @Bean
        public JobLauncherTestUtils jobLauncherTestUtils() {
            return new JobLauncherTestUtils();
        }
    }

    public static class UpdateItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public UpdateItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("foo".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
                }
            }
        }
    }

    public static class InsertItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public InsertItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("bar".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
                    throw new IllegalStateException("Something went wrong!");
                }
            }
        }
    }

    public static class Person {

        private long id;

        private String name;

        public Person() {
        }

        public Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

我的示例使用自定义项目编写器,但这也适用于两个JdbcBatchItemWriters。

我希望这有帮助!

于 2018-08-19T13:52:53.463 回答