1

我正在使用休眠学习 Spring 批处理作业,但我面临一个问题。

TransactionRequiredException:没有正在进行的事务

我已经创建了阅读器、处理器和编写器。我正在更新处理器中的用户,并且在编写器之后我收到此错误。我已经尝试在处理器上使用 @Transacional 方法,但它不起作用。我不确定这里有什么问题。添加我的作业配置文件。在此之前,我面临与事务管理器相关的问题。我也不确定我使用的事务管理器是否合适。请让我知道我在哪里犯错。

import java.util.Properties;
import javax.sql.DataSource;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.orm.hibernate5.HibernateTransactionManager;
import org.springframework.orm.hibernate5.LocalSessionFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class JobConfiguration {

@Bean
public DataSource jobDataSource() {
    DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/TestDB");
    dataSource.setUsername("****");
    dataSource.setPassword("****");
    return dataSource;
}

private JobRepository getJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(jobDataSource());
    factory.setTransactionManager(getTransactionManager());
    factory.afterPropertiesSet();
    return (JobRepository) factory.getObject();
}

private PlatformTransactionManager getTransactionManager() {
    HibernateTransactionManager txManager = new HibernateTransactionManager();
    txManager.setSessionFactory(jobSessionFactory().getObject());
    return txManager;
}

@Bean
public JobLauncher myJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(getJobRepository());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

@Bean
public LocalSessionFactoryBean jobSessionFactory() {
    LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean();
    sessionFactory.setDataSource(jobDataSource());
    sessionFactory.setPackagesToScan("com.test");
    sessionFactory.setHibernateProperties(jobHibernateProperties());
    return sessionFactory;
}

@Bean
public Properties jobHibernateProperties() {
    Properties hibernateProperties = new Properties();
    hibernateProperties.put("hibernate.dialect", "org.hibernate.dialect.MySQL5Dialect");
    hibernateProperties.put("hibernate.show_sql", false);
    hibernateProperties.put("hibernate.hbm2ddl.auto", "update");
    hibernateProperties.put("hibernate.format_sql", true);
    return hibernateProperties;
}
}

测试作业.Java

import org.hibernate.SessionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import 
org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import 
org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.database.HibernateCursorItemReader;
import org.springframework.batch.item.database.builder.HibernateCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.cadau.jobs.dto.TestDTO;
import com.Test.model.User;

@Configuration
public class TestJob {

@Autowired
private SessionFactory sessionFactory;

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory steps;

@Bean(name = "firstBatchJob")
public Job job(@Qualifier("step1")Step step1) {
    return jobBuilderFactory.get("firstBatchJob")
            .start(step1)
            .build();
}

@Bean(name="step1")
protected Step step1(ItemReader<User> itemReader,
  ItemProcessor<User, User> testProcessor,
  ItemWriter<User> testWriter) {
    return steps.get("step1").<User, User> chunk(10)
      .reader(itemReader)
      .processor(testProcessor)
      .writer(testWriter)
      .build();
}

@Bean(value = "itemReader")
public ItemReader itemReader() throws UnexpectedInputException, ParseException {
    HibernateCursorItemReader<User> reader = new HibernateCursorItemReader<>();
    reader.setQueryString("from User");
    reader.setFetchSize(1000);
    reader.setSessionFactory(sessionFactory);
    reader.setUseStatelessSession(true);
    return reader;
}
}

测试处理器.java

@Transactional
@Component(value = "testProcessor")
@StepScope
public class TestProcessor implements ItemProcessor<User, User> {

@Autowired
private IUserService userService;

@Override
public User process(User item) throws Exception {
    System.out.println("Processor:::  "+item);
    User user = userService.edit(item.getId());
    System.out.println("DB user : "+user);
    user.setFullName("Test User");
    userService.update(user);
    System.out.println("After update user" + user);
    return item;
}
}

Testwriter.java

@StepScope
@Component(value = "testWriter")
public class TestWriter implements ItemWriter<User>{

@Override
public void write(List<? extends User> items) throws Exception {
    System.out.println("Writer called.");
}
}

也添加错误

2020-05-23 14:59:05,098 INFO  [restartedMain] org.springframework.batch.core.launch.support.SimpleJobLauncher$1: Job: [SimpleJob: [name=firstBatchJob]] launched with the following parameters: [{}]
2020-05-23 14:59:06,679 INFO  [restartedMain] org.springframework.batch.core.job.SimpleStepHandler: Executing step: [step1]
2020-05-23 14:59:14,965 INFO  [restartedMain] org.hibernate.hql.internal.QueryTranslatorFactoryInitiator: HHH000397: Using ASTQueryTranslatorFactory
Processor:::  com.cadau.model.User@3eae26f1
DB user : com.cadau.model.User@3edb84f6
After update usercom.cadau.model.User@3edb84f6
Processor:::  com.cadau.model.User@3b237ad2
DB user : com.cadau.model.User@2c527a91
After update usercom.cadau.model.User@2c527a91
Writer called.
2020-05-23 14:59:24,011 INFO  [restartedMain] org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback: Commit failed while step execution data was already updated. Reverting to old version.
2020-05-23 14:59:24,018 ERROR [restartedMain] org.springframework.batch.core.step.AbstractStep: Encountered an error executing step step1 in job firstBatchJob
javax.persistence.TransactionRequiredException: no transaction is in progress
at org.hibernate.internal.AbstractSharedSessionContract.checkTransactionNeededForUpdateOperation(AbstractSharedSessionContract.java:398)
at org.hibernate.internal.SessionImpl.checkTransactionNeededForUpdateOperation(SessionImpl.java:3558)
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1444)
at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1440)
at org.springframework.orm.hibernate5.SessionFactoryUtils.flush(SessionFactoryUtils.java:147)
at org.springframework.orm.hibernate5.SpringSessionSynchronization.beforeCommit(SpringSessionSynchronization.java:95)
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerBeforeCommit(TransactionSynchronizationUtils.java:96)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerBeforeCommit(AbstractPlatformTransactionManager.java:922)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:730)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy126.commit(Unknown Source)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:399)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy124.run(Unknown Source)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:207)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:181)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:168)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:163)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:780)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:764)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:319)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1214)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1203)
at com.cadau.jobs.app.CadauJobsApplication.main(CadauJobsApplication.java:27)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
2020-05-23 14:59:24,432 INFO  [restartedMain] org.springframework.batch.core.launch.support.SimpleJobLauncher$1: Job: [SimpleJob: [name=firstBatchJob]] completed with the following parameters: [{}] and the following status: [FAILED]
4

2 回答 2

1

由于您使用的是 Hibernate,因此您需要配置 Spring Batch 以使用 aHibernateTransactionManager来驱动事务。如参考文档中所述,实现这一点的方法是定义 aBatchConfigurer和覆盖getTransactionManager()。但是,这不是您的代码所做的:

private PlatformTransactionManager getTransactionManager() {
   HibernateTransactionManager txManager = new HibernateTransactionManager();
   txManager.setSessionFactory(jobSessionFactory().getObject());
   return txManager;
}

您可以进行JobConfiguration扩展DefaultBatchConfigurer

@Configuration
public class JobConfiguration extends DefaultBatchConfigurer {

   @Override
   public HibernateTransactionManager getTransactionManager() {
      HibernateTransactionManager txManager = new HibernateTransactionManager();
      txManager.setSessionFactory(jobSessionFactory().getObject());
      return txManager;
   }

}

如果您使用,这也是执行此操作的方法@EnableBatchProcessing,请参阅以“为了使用自定义事务管理器,应提供自定义 BatchConfigurer。例如:”开头的部分中的Javadoc 。...

于 2020-05-26T08:07:33.080 回答
0

首先,您必须将项目保存在 writer 而不是处理器中。处理器用于可选转换,同时项目从读取器传递到写入器。您不需要处理器上的@Transactional。事务是默认配置的。事务管理器可以选择在 StepBuilderFactory steps.get("step1").transactionManager(transactionManager) 中配置,但这通常是默认完成的。只需将 transactionManager 声明为 bean。

最后,您可以使用 @EnableBatchProcessing 并完全删除 JobConfiguration。

于 2020-05-23T11:27:17.683 回答