我有一个要求,我从数据库中读取两个不同的Query
. 每个Query
都有自己的SQL
. 它们是相似的SQLs
,并且在大多数情况下都在使用相同的表集,但差异很小。我想检查我是否可以有两个SQLs
或者ItemReader
使用 ajdbctemplate
是可能的?
任何想法,示例代码?
我有一个要求,我从数据库中读取两个不同的Query
. 每个Query
都有自己的SQL
. 它们是相似的SQLs
,并且在大多数情况下都在使用相同的表集,但差异很小。我想检查我是否可以有两个SQLs
或者ItemReader
使用 ajdbctemplate
是可能的?
任何想法,示例代码?
如果您想“重用”现有的JdbcCursorItemReader
(或其他 Spring Batch Jdbc*ItemReaders 之一),您可以通过利用 step 范围动态切换 SQL。下面是一个配置示例,它sqlKey
根据JobParameters
. 声明的来源是一张简单的地图。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
<batch:job id="switchSQL">
<batch:step id="switchSQL.step1">
<batch:tasklet>
<batch:chunk reader="sqlItemReader" writer="outItemWriter" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="sqlItemReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="#{@sqlStatements[jobParameters['sqlKey']]}"/>
<property name="rowMapper">
<bean class="de.incompleteco.spring.batch.data.ColumnRowMapper"/>
</property>
</bean>
<util:map id="sqlStatements">
<entry key="sql1" value="select * from table_one"/>
<entry key="sql2" value="select * from table_two"/>
</util:map>
<bean id="outItemWriter"
class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="outWriter"/>
<property name="targetMethod" value="write"/>
</bean>
<bean id="outWriter" class="de.incompleteco.spring.batch.item.SystemOutItemWriter"/>
</beans>
这是支持类;
(一个简单的项目作家)
package de.incompleteco.spring.batch.item;
public class SystemOutItemWriter {
public void write(Object object) {
System.out.println(object);
}
}
(和一个简单的行映射器)
package de.incompleteco.spring.batch.data;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
public class ColumnRowMapper implements RowMapper<String> {
public String mapRow(ResultSet rs, int rowNum) throws SQLException {
return rs.getString(1);
}
}
这是剩余的配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
<bean id="jobExplorer"
class="org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean">
<property name="repositoryFactory" ref="&jobRepository"/>
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager">
</bean>
<bean class="org.springframework.batch.core.scope.StepScope"/>
</beans>
和
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd">
<jdbc:embedded-database id="dataSource" type="H2">
<jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
<jdbc:script location="classpath:/META-INF/sql/insert-h2.sql"/>
</jdbc:embedded-database>
<bean id="dataSourceTransactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
</beans>
和 sql 的东西
create table table_one (
column_a varchar(50)
);
create table table_two (
column_a varchar(50)
);
--table one
insert into table_one (column_a) values ('hello');
insert into table_one (column_a) values ('world');
--table two
insert into table_two (column_a) values ('hey');
现在终于进行单元测试
package de.incompleteco.spring;
import static org.junit.Assert.assertFalse;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/*-context.xml"})
public class SwitchSQLIntegrationTest {
@Autowired
private Job job;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Test
public void test() throws Exception {
//setup the parameters
JobParameters parameters = new JobParametersBuilder().addLong("runtime",System.currentTimeMillis())
.addString("sqlKey", "sql1").toJobParameters();
//run
JobExecution execution = jobLauncher.run(job,parameters);
//test
while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
Thread.sleep(100);
}//end while
//load
execution = jobExplorer.getJobExecution(execution.getId());
//test
assertFalse(execution.getStatus().isUnsuccessful());
//run it again
parameters = new JobParametersBuilder().addLong("runtime",System.currentTimeMillis())
.addString("sqlKey", "sql2").toJobParameters();
//run
execution = jobLauncher.run(job,parameters);
//test
while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
Thread.sleep(100);
}//end while
//load
execution = jobExplorer.getJobExecution(execution.getId());
//test
assertFalse(execution.getStatus().isUnsuccessful());
}
}