2009-12-23 62 views
1

嗨,我尝试使用批处理框架运行两个作业。 我的问题是SimpleJobLauncher只运行作业列表中最后一个作业。 在这里我在做什么: 我有两个工作在我的数据库以及作业的步骤。 我从数据库中读取作业数据,并对其进行处理如下无法与石英运行多个作业

public class BatchJobScheduler { private static Log sLog = LogFactory.getLog(BatchJobScheduler.class); private ApplicationContext ac; private DataSourceTransactionManager mTransactionManager; private SimpleJobLauncher mJobLauncher; private JobRepository mJobRepository; private SimpleStepFactoryBean stepFactory; private MapJobRegistry mapJobRegistry; private JobDetailBean jobDetail; private CronTriggerBean cronTrigger; private SimpleJob job; private SchedulerFactoryBean schedulerFactory; private static String mDriverClass; private static String mConnectionUrl; private static String mUser; private static String mPassword; public static JobMetaDataFeeder metadataFeeder; static { try { loadProperties(); metadataFeeder = new JobMetaDataFeeder(); metadataFeeder.configureDataSource(mDriverClass, mConnectionUrl, mUser, mPassword); } catch (FileNotFoundException e) { } catch (IOException e) { } catch (SQLException e) { } catch (ClassNotFoundException e) { } }

private static void loadProperties() throws FileNotFoundException, 
     IOException { 
    Properties properties = new Properties(); 
    InputStream is; 
    if (BatchJobScheduler.class.getClassLoader() != null) { 
     is = BatchJobScheduler.class.getClassLoader().getResourceAsStream(
       "batch.properties"); 
    } else { 
     is = System.class.getClassLoader().getResourceAsStream(
       "batch.properties"); 
    } 
    properties.load(is); 
    mDriverClass = properties.getProperty("batch.jdbc.driver"); 
    mConnectionUrl = properties.getProperty("batch.jdbc.url"); 
    mUser = properties.getProperty("batch.jdbc.user"); 
    mPassword = properties.getProperty("batch.jdbc.password"); 
} 

public void start(WebApplicationContext wac) throws Exception { 
    try { 
     ac = new FileSystemXmlApplicationContext("batch-spring.xml"); 
     mTransactionManager = (DataSourceTransactionManager) ac 
       .getBean("mTransactionManager"); 
     mJobLauncher = (SimpleJobLauncher) ac.getBean("mJobLauncher"); 
     mJobRepository = (JobRepository) ac.getBean("mRepositoryFactory"); 
     mJobLauncher.afterPropertiesSet(); 
     List<JobMetadata> jobsMetaData = getJobsData(mDriverClass, 
       mConnectionUrl, mUser, mPassword, null); 
     createAndRunScheduler(jobsMetaData); 
    } catch (Exception e) { 
     e.printStackTrace(); 
     sLog.error("Exception while starting job", e); 
    } 
} 

@SuppressWarnings("unchecked") 
public List<CronTriggerBean> getJobTriggers(List<JobMetadata> jobsMetaData) 
     throws Exception { 
    List<CronTriggerBean> triggers = new ArrayList<CronTriggerBean>(); 
    for (JobMetadata jobMetadata : jobsMetaData) { 
     job = (SimpleJob) ac.getBean("job"); 
     job.setName(jobMetadata.getJobName()); 
     ArrayList<Step> steps = new ArrayList<Step>(); 
     for (StepMetadata stepMetadata : jobMetadata.getSteps()) { 
      // System.err.println(ac.getBean("stepFactory").getClass()); 
      stepFactory = new SimpleStepFactoryBean<String, Object>(); 
      stepFactory.setTransactionManager(mTransactionManager); 
      stepFactory.setJobRepository(mJobRepository); 
      stepFactory.setCommitInterval(stepMetadata.getCommitInterval()); 
      stepFactory.setStartLimit(stepMetadata.getStartLimit()); 
      T5CItemReader itemReader = (T5CItemReader) BeanUtils 
        .instantiateClass(Class.forName(stepMetadata 
          .getStepReaderClass())); 
      itemReader 
        .setItems(getItemList(jobMetadata.getJobParameters())); 
      stepFactory.setItemReader(itemReader); 
      stepFactory.setItemProcessor((ItemProcessor) BeanUtils 
        .instantiateClass(Class.forName(stepMetadata 
          .getStepProcessorClass()))); 
      stepFactory.setItemWriter((ItemWriter) BeanUtils 
        .instantiateClass(Class.forName(stepMetadata 
          .getStepWriterClass()))); 
      stepFactory.setBeanName(stepMetadata.getStepName()); 
      steps.add((Step) stepFactory.getObject()); 
     } 
     job.setSteps(steps); 
     ReferenceJobFactory jobFactory = new ReferenceJobFactory(job); 
     mapJobRegistry = (MapJobRegistry) ac.getBean("jobRegistry"); 
     mapJobRegistry.register(jobFactory); 
     jobDetail = (JobDetailBean) ac.getBean("jobDetail"); 
     jobDetail.setJobClass(Class.forName(jobMetadata.getMJoblauncher())); 
     jobDetail.setGroup(jobMetadata.getJobGroupName()); 
     jobDetail.setName(jobMetadata.getJobName()); 
     Map<String, Object> jobDataMap = new HashMap<String, Object>(); 
     jobDataMap.put("jobName", jobMetadata.getJobName()); 
     jobDataMap.put("jobLocator", mapJobRegistry); 
     jobDataMap.put("jobLauncher", mJobLauncher); 
     jobDataMap.put("timestamp", new Date()); 
     // jobDataMap.put("jobParams", jobMetadata.getJobParameters()); 
     jobDetail.setJobDataAsMap(jobDataMap); 
     jobDetail.afterPropertiesSet(); 
     cronTrigger = (CronTriggerBean) ac.getBean("cronTrigger"); 
     cronTrigger.setJobDetail(jobDetail); 
     cronTrigger.setJobName(jobMetadata.getJobName()); 
     cronTrigger.setJobGroup(jobMetadata.getJobGroupName()); 
     cronTrigger.setCronExpression(jobMetadata.getCronExpression()); 
     triggers.add(cronTrigger); 
    } 
    return triggers; 
} 

private void createAndRunScheduler(List<JobMetadata> jobsMetaData) 
     throws Exception { 
    // System.err.println(ac.getBean("schedulerFactory").getClass()); 
    schedulerFactory = new SchedulerFactoryBean(); 
    List<CronTriggerBean> triggerList = getJobTriggers(jobsMetaData); 
    Trigger[] triggers = new Trigger[triggerList.size()]; 
    int triggerCount = 0; 
    for (CronTriggerBean trigger : triggerList) { 
     triggers[triggerCount] = trigger; 
     triggerCount++; 
    } 
    schedulerFactory.setTriggers(triggers); 
    schedulerFactory.afterPropertiesSet(); 
} 

private List<JobMetadata> getJobsData(String driverClass, 
     String connectionURL, String user, String password, String query) 
     throws SQLException, ClassNotFoundException { 
    metadataFeeder.createJobMetadata(query); 
    return metadataFeeder.getJobsMetadata(); 
} 

private List<String> getItemList(String jobParameterString) { 
    List<String> itemList = new ArrayList<String>(); 
    String[] parameters = jobParameterString.split(";"); 
    for (String string : parameters) { 
     String[] mapKeyValue = string.split("="); 
     if (mapKeyValue.length == 2) { 
      itemList.add(mapKeyValue[0] + ":" + mapKeyValue[1]); 
     } else { 
      // exception for invalid job parameters 
      System.out.println("exception for invalid job parameters"); 
     } 
    } 
    return itemList; 
} 

private Map<String, Object> getParameterMap(String jobParameterString) { 
    Map<String, Object> parameterMap = new HashMap<String, Object>(); 
    String[] parameters = jobParameterString.split(";"); 
    for (String string : parameters) { 
     String[] mapKeyValue = string.split("="); 
     if (mapKeyValue.length == 2) { 
      parameterMap.put(mapKeyValue[0], mapKeyValue[1]); 
     } else { 
      // exception for invalid job parameters 
      System.out.println("exception for invalid job parameters"); 
     } 
    } 
    return parameterMap; 
} 

}

public class MailJobLauncher extends QuartzJobBean { /** * Special key in job data map for the name of a job to run. */ static final String JOB_NAME = "jobName"; private static Log sLog = LogFactory.getLog(MailJobLauncher.class); private JobLocator mJobLocator; private JobLauncher mJobLauncher;

/** 
* Public setter for the {@link JobLocator}. 
* 
* @param jobLocator 
*   the {@link JobLocator} to set 
*/ 
public void setJobLocator(JobLocator jobLocator) { 
    this.mJobLocator = jobLocator; 
} 

/** 
* Public setter for the {@link JobLauncher}. 
* 
* @param jobLauncher 
*   the {@link JobLauncher} to set 
*/ 
public void setJobLauncher(JobLauncher jobLauncher) { 
    this.mJobLauncher = jobLauncher; 
} 

@Override 
@SuppressWarnings("unchecked") 
protected void executeInternal(JobExecutionContext context) { 
    Map<String, Object> jobDataMap = context.getMergedJobDataMap(); 
    executeRecursive(jobDataMap); 
} 

private void executeRecursive(Map<String, Object> jobDataMap) { 
    String jobName = (String) jobDataMap.get(JOB_NAME); 
    JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap); 
    sLog.info("Quartz trigger firing with Spring Batch jobName=" + jobName 
      + jobDataMap + jobParameters); 
    try { 
     mJobLauncher.run(mJobLocator.getJob(jobName), jobParameters); 
    } catch (JobInstanceAlreadyCompleteException e) { 
     jobDataMap.remove("timestamp"); 
     jobDataMap.put("timestamp", new Date()); 
     executeRecursive(jobDataMap); 
    } catch (NoSuchJobException e) { 
     sLog.error("Could not find job.", e); 
    } catch (JobExecutionException e) { 
     sLog.error("Could not execute job.", e); 
    } 
} 

/* 
* Copy parameters that are of the correct type over to {@link 
* JobParameters}, ignoring jobName. 
* @return a {@link JobParameters} instance 
*/ 
private JobParameters getJobParametersFromJobMap(
     Map<String, Object> jobDataMap) { 
    JobParametersBuilder builder = new JobParametersBuilder(); 
    for (Entry<String, Object> entry : jobDataMap.entrySet()) { 
     String key = entry.getKey(); 
     Object value = entry.getValue(); 
     if (value instanceof String && !key.equals(JOB_NAME)) { 
      builder.addString(key, (String) value); 
     } else if (value instanceof Float || value instanceof Double) { 
      builder.addDouble(key, ((Number) value).doubleValue()); 
     } else if (value instanceof Integer || value instanceof Long) { 
      builder.addLong(key, ((Number) value).longValue()); 
     } else if (value instanceof Date) { 
      builder.addDate(key, (Date) value); 
     } else { 
      sLog 
        .debug("JobDataMap contains values which are not job parameters (ignoring)."); 
     } 
    } 
    return builder.toJobParameters(); 
} 

} 我不能弄明白为什么启动器忽略所有其他的工作,请帮助我。 问候

回答

0

确保这些属性被设置:

org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 3 org.quartz.threadPool.threadPriority = 5

这将允许几个作业同时运行。根据需要调整设置。