1

嗨,我正在尝试使用批处理框架运行两个作业。我的问题是 SimpleJobLauncher 只运行一个作业列表中的最后一个作业。我在做什么:我的数据库中有两个工作以及工作的步骤。我从数据库中读取作业数据并按以下方式处理它

public class BatchJobScheduler { private static Log sLog = LogFactory.getLog(BatchJobScheduler.class); private ApplicationContext ac; private DataSourceTransactionManager mTransactionManager; private SimpleJobLauncher mJobLauncher; private JobRepository mJobRepository; private SimpleStepFactoryBean stepFactory; private MapJobRegistry mapJobRegistry; private JobDetailBean jobDetail; private CronTriggerBean cronTrigger; private SimpleJob job; private SchedulerFactoryBean schedulerFactory; private static String mDriverClass; private static String mConnectionUrl; private static String mUser; private static String mPassword; public static JobMetaDataFeeder metadataFeeder; static { try { loadProperties(); metadataFeeder = new JobMetaDataFeeder(); metadataFeeder.configureDataSource(mDriverClass, mConnectionUrl, mUser, mPassword); } catch (FileNotFoundException e) { } catch (IOException e) { } catch (SQLException e) { } catch (ClassNotFoundException e) { } }

private static void loadProperties() throws FileNotFoundException,
        IOException {
    Properties properties = new Properties();
    InputStream is;
    if (BatchJobScheduler.class.getClassLoader() != null) {
        is = BatchJobScheduler.class.getClassLoader().getResourceAsStream(
                "batch.properties");
    } else {
        is = System.class.getClassLoader().getResourceAsStream(
                "batch.properties");
    }
    properties.load(is);
    mDriverClass = properties.getProperty("batch.jdbc.driver");
    mConnectionUrl = properties.getProperty("batch.jdbc.url");
    mUser = properties.getProperty("batch.jdbc.user");
    mPassword = properties.getProperty("batch.jdbc.password");
}

public void start(WebApplicationContext wac) throws Exception {
    try {
        ac = new FileSystemXmlApplicationContext("batch-spring.xml");
        mTransactionManager = (DataSourceTransactionManager) ac
                .getBean("mTransactionManager");
        mJobLauncher = (SimpleJobLauncher) ac.getBean("mJobLauncher");
        mJobRepository = (JobRepository) ac.getBean("mRepositoryFactory");
        mJobLauncher.afterPropertiesSet();
        List<JobMetadata> jobsMetaData = getJobsData(mDriverClass,
                mConnectionUrl, mUser, mPassword, null);
        createAndRunScheduler(jobsMetaData);
    } catch (Exception e) {
        e.printStackTrace();
        sLog.error("Exception while starting job", e);
    }
}

@SuppressWarnings("unchecked")
public List<CronTriggerBean> getJobTriggers(List<JobMetadata> jobsMetaData)
        throws Exception {
    List<CronTriggerBean> triggers = new ArrayList<CronTriggerBean>();
    for (JobMetadata jobMetadata : jobsMetaData) {
        job = (SimpleJob) ac.getBean("job");
        job.setName(jobMetadata.getJobName());
        ArrayList<Step> steps = new ArrayList<Step>();
        for (StepMetadata stepMetadata : jobMetadata.getSteps()) {
            // System.err.println(ac.getBean("stepFactory").getClass());
            stepFactory = new SimpleStepFactoryBean<String, Object>();
            stepFactory.setTransactionManager(mTransactionManager);
            stepFactory.setJobRepository(mJobRepository);
            stepFactory.setCommitInterval(stepMetadata.getCommitInterval());
            stepFactory.setStartLimit(stepMetadata.getStartLimit());
            T5CItemReader itemReader = (T5CItemReader) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepReaderClass()));
            itemReader
                    .setItems(getItemList(jobMetadata.getJobParameters()));
            stepFactory.setItemReader(itemReader);
            stepFactory.setItemProcessor((ItemProcessor) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepProcessorClass())));
            stepFactory.setItemWriter((ItemWriter) BeanUtils
                    .instantiateClass(Class.forName(stepMetadata
                            .getStepWriterClass())));
            stepFactory.setBeanName(stepMetadata.getStepName());
            steps.add((Step) stepFactory.getObject());
        }
        job.setSteps(steps);
        ReferenceJobFactory jobFactory = new ReferenceJobFactory(job);
        mapJobRegistry = (MapJobRegistry) ac.getBean("jobRegistry");
        mapJobRegistry.register(jobFactory);
        jobDetail = (JobDetailBean) ac.getBean("jobDetail");
        jobDetail.setJobClass(Class.forName(jobMetadata.getMJoblauncher()));
        jobDetail.setGroup(jobMetadata.getJobGroupName());
        jobDetail.setName(jobMetadata.getJobName());
        Map<String, Object> jobDataMap = new HashMap<String, Object>();
        jobDataMap.put("jobName", jobMetadata.getJobName());
        jobDataMap.put("jobLocator", mapJobRegistry);
        jobDataMap.put("jobLauncher", mJobLauncher);
        jobDataMap.put("timestamp", new Date());
        // jobDataMap.put("jobParams", jobMetadata.getJobParameters());
        jobDetail.setJobDataAsMap(jobDataMap);
        jobDetail.afterPropertiesSet();
        cronTrigger = (CronTriggerBean) ac.getBean("cronTrigger");
        cronTrigger.setJobDetail(jobDetail);
        cronTrigger.setJobName(jobMetadata.getJobName());
        cronTrigger.setJobGroup(jobMetadata.getJobGroupName());
        cronTrigger.setCronExpression(jobMetadata.getCronExpression());
        triggers.add(cronTrigger);
    }
    return triggers;
}

private void createAndRunScheduler(List<JobMetadata> jobsMetaData)
        throws Exception {
    // System.err.println(ac.getBean("schedulerFactory").getClass());
    schedulerFactory = new SchedulerFactoryBean();
    List<CronTriggerBean> triggerList = getJobTriggers(jobsMetaData);
    Trigger[] triggers = new Trigger[triggerList.size()];
    int triggerCount = 0;
    for (CronTriggerBean trigger : triggerList) {
        triggers[triggerCount] = trigger;
        triggerCount++;
    }
    schedulerFactory.setTriggers(triggers);
    schedulerFactory.afterPropertiesSet();
}

private List<JobMetadata> getJobsData(String driverClass,
        String connectionURL, String user, String password, String query)
        throws SQLException, ClassNotFoundException {
    metadataFeeder.createJobMetadata(query);
    return metadataFeeder.getJobsMetadata();
}

private List<String> getItemList(String jobParameterString) {
    List<String> itemList = new ArrayList<String>();
    String[] parameters = jobParameterString.split(";");
    for (String string : parameters) {
        String[] mapKeyValue = string.split("=");
        if (mapKeyValue.length == 2) {
            itemList.add(mapKeyValue[0] + ":" + mapKeyValue[1]);
        } else {
            // exception for invalid job parameters
            System.out.println("exception for invalid job parameters");
        }
    }
    return itemList;
}

private Map<String, Object> getParameterMap(String jobParameterString) {
    Map<String, Object> parameterMap = new HashMap<String, Object>();
    String[] parameters = jobParameterString.split(";");
    for (String string : parameters) {
        String[] mapKeyValue = string.split("=");
        if (mapKeyValue.length == 2) {
            parameterMap.put(mapKeyValue[0], mapKeyValue[1]);
        } else {
            // exception for invalid job parameters
            System.out.println("exception for invalid job parameters");
        }
    }
    return parameterMap;
}

}

public class MailJobLauncher extends QuartzJobBean { /** * Special key in job data map for the name of a job to run. */ static final String JOB_NAME = "jobName"; private static Log sLog = LogFactory.getLog(MailJobLauncher.class); private JobLocator mJobLocator; private JobLauncher mJobLauncher;

/**
 * Public setter for the {@link JobLocator}.
 * 
 * @param jobLocator
 *            the {@link JobLocator} to set
 */
public void setJobLocator(JobLocator jobLocator) {
    this.mJobLocator = jobLocator;
}

/**
 * Public setter for the {@link JobLauncher}.
 * 
 * @param jobLauncher
 *            the {@link JobLauncher} to set
 */
public void setJobLauncher(JobLauncher jobLauncher) {
    this.mJobLauncher = jobLauncher;
}

@Override
@SuppressWarnings("unchecked")
protected void executeInternal(JobExecutionContext context) {
    Map<String, Object> jobDataMap = context.getMergedJobDataMap();
    executeRecursive(jobDataMap);
}

private void executeRecursive(Map<String, Object> jobDataMap) {
    String jobName = (String) jobDataMap.get(JOB_NAME);
    JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
    sLog.info("Quartz trigger firing with Spring Batch jobName=" + jobName
            + jobDataMap + jobParameters);
    try {
        mJobLauncher.run(mJobLocator.getJob(jobName), jobParameters);
    } catch (JobInstanceAlreadyCompleteException e) {
        jobDataMap.remove("timestamp");
        jobDataMap.put("timestamp", new Date());
        executeRecursive(jobDataMap);
    } catch (NoSuchJobException e) {
        sLog.error("Could not find job.", e);
    } catch (JobExecutionException e) {
        sLog.error("Could not execute job.", e);
    }
}

/*
 * Copy parameters that are of the correct type over to {@link
 * JobParameters}, ignoring jobName.
 * @return a {@link JobParameters} instance
 */
private JobParameters getJobParametersFromJobMap(
        Map<String, Object> jobDataMap) {
    JobParametersBuilder builder = new JobParametersBuilder();
    for (Entry<String, Object> entry : jobDataMap.entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        if (value instanceof String && !key.equals(JOB_NAME)) {
            builder.addString(key, (String) value);
        } else if (value instanceof Float || value instanceof Double) {
            builder.addDouble(key, ((Number) value).doubleValue());
        } else if (value instanceof Integer || value instanceof Long) {
            builder.addLong(key, ((Number) value).longValue());
        } else if (value instanceof Date) {
            builder.addDate(key, (Date) value);
        } else {
            sLog
                    .debug("JobDataMap contains values which are not job parameters (ignoring).");
        }
    }
    return builder.toJobParameters();
}

} 我无法弄清楚为什么启动器忽略了所有其他工作,请帮助我。问候

4

1 回答 1

0

确保设置了这些属性:

org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount=3 org.quartz.threadPool.threadPriority=5

这将允许同时运行几个作业。根据需要调整设置。

于 2010-01-16T03:15:26.267 回答