2017-02-03 124 views
0

我有一个简单的bach作业,它从JMS队列(ActiveMQ)读取并写入文件。批处理作业按预期方式运行,并写入文件以遵守已设置为10,000的提交间隔。Spring批处理:从JMS队列中读取,步骤并未结束

有这方面的

  1. 批处理作业队列中读取并没有结束2个观察值。

  2. 我看到,队列中的所有消息都已被使用,但只有当新消息被推送到JMS队列并且满足提交间隔时,才会将最后一个块写入文件。

它是预期的行为吗?我想安排批处理作业,并在该时间点消耗并写入队列中的所有消息。有任何建议吗?

@Autowired 
private JobBuilderFactory jobBuilderFactory; 

@Bean 
public TransactionAwareConnectionFactoryProxy activeMQConnectionFactory() { 
    ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
    TransactionAwareConnectionFactoryProxy activeMQConnectionFactory = new TransactionAwareConnectionFactoryProxy(amqConnectionFactory); 
    return activeMQConnectionFactory; 
} 

@Bean 
public ActiveMQQueue defaultQueue() { 
    return new ActiveMQQueue("firstQueue"); 
} 

@Bean 
public PlatformTransactionManager transactionManager() { 
    return new ResourcelessTransactionManager(); 
} 

@Bean 
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception { 
    return new MapJobRepositoryFactoryBean(transactionManager).getObject(); 
} 

@Bean 
@DependsOn("jobRepository") 
public SimpleJobLauncher simpleJobLauncher(JobRepository jobRepository) { 
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher(); 
    simpleJobLauncher.setJobRepository(jobRepository); 
    return simpleJobLauncher; 
} 

如果我设置receiveTimeout为较小的数字,所有消息都没有被消耗,从而设定为上限。

@Bean 
@DependsOn(value = { "activeMQConnectionFactory", "defaultQueue" }) 
public JmsTemplate firstQueueTemplate(ActiveMQQueue defaultQueue, TransactionAwareConnectionFactoryProxy activeMQConnectionFactory) { 
    JmsTemplate firstQueueTemplate = new JmsTemplate(activeMQConnectionFactory); 
    firstQueueTemplate.setDefaultDestination(defaultQueue); 
    firstQueueTemplate.setSessionTransacted(true); 
    firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE); 
    return firstQueueTemplate; 
} 

配置批处理作业。

@Bean 
public JmsItemReader<String> jmsItemReader(JmsTemplate firstQueueTemplate) { 
    JmsItemReader<String> jmsItemReader = new JmsItemReader<>(); 
    jmsItemReader.setJmsTemplate(firstQueueTemplate); 
    jmsItemReader.setItemType(String.class); 
    return jmsItemReader; 
} 


@Bean 
public ItemWriter<String> flatFileItemWriter() { 
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>(); 
    writer.setResource(new FileSystemResource("/mypath/output.csv")); 
    writer.setLineAggregator(new PassThroughLineAggregator<String>()); 
    return writer; 
} 

@Bean 
@DependsOn(value = { "jmsItemReader", "jmsItemWriter", "jobRepository", "transactionManager" }) 
public Step queueReaderStep(JmsItemReader<String> jmsItemReader, ItemWriter<String> flatFileItemWriter, JobRepository jobRepository, 
     PlatformTransactionManager transactionManager) throws Exception { 
    StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager); 
    AbstractTaskletStepBuilder<SimpleStepBuilder<String, String>> step = stepBuilderFactory.get("queueReaderStep").<String, String> chunk(10000) 
      .reader(jmsItemReader).writer(flatFileItemWriter); 
    return step.build(); 
} 


@Bean 
@DependsOn(value = { "jobRepository", "queueReaderStep" }) 
public Job jsmReaderJob(JobRepository jobRepository, Step queueReaderStep) { 
    return this.jobBuilderFactory.get("jsmReaderJob").repository(jobRepository).incrementer(new RunIdIncrementer()) 
      .flow(queueReaderStep).end().build(); 
} 

回答

2

由Spring Batch的提供的JmsItemReader真的意味着更多,因为模板或例子,如你注意,它永远不会返回null所以过程永无止境。你需要写一些东西来表明给定的消息表明该步骤已完成。

相关问题