0
我有一个简单的bach作业,它从JMS队列(ActiveMQ)读取并写入文件。批处理作业按预期方式运行,并写入文件以遵守已设置为10,000的提交间隔。Spring批处理:从JMS队列中读取,步骤并未结束
有这方面的
批处理作业队列中读取并没有结束2个观察值。
我看到,队列中的所有消息都已被使用,但只有当新消息被推送到JMS队列并且满足提交间隔时,才会将最后一个块写入文件。
它是预期的行为吗?我想安排批处理作业,并在该时间点消耗并写入队列中的所有消息。有任何建议吗?
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public TransactionAwareConnectionFactoryProxy activeMQConnectionFactory() {
ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
TransactionAwareConnectionFactoryProxy activeMQConnectionFactory = new TransactionAwareConnectionFactoryProxy(amqConnectionFactory);
return activeMQConnectionFactory;
}
@Bean
public ActiveMQQueue defaultQueue() {
return new ActiveMQQueue("firstQueue");
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception {
return new MapJobRepositoryFactoryBean(transactionManager).getObject();
}
@Bean
@DependsOn("jobRepository")
public SimpleJobLauncher simpleJobLauncher(JobRepository jobRepository) {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
return simpleJobLauncher;
}
如果我设置receiveTimeout为较小的数字,所有消息都没有被消耗,从而设定为上限。
@Bean
@DependsOn(value = { "activeMQConnectionFactory", "defaultQueue" })
public JmsTemplate firstQueueTemplate(ActiveMQQueue defaultQueue, TransactionAwareConnectionFactoryProxy activeMQConnectionFactory) {
JmsTemplate firstQueueTemplate = new JmsTemplate(activeMQConnectionFactory);
firstQueueTemplate.setDefaultDestination(defaultQueue);
firstQueueTemplate.setSessionTransacted(true);
firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE);
return firstQueueTemplate;
}
配置批处理作业。
@Bean
public JmsItemReader<String> jmsItemReader(JmsTemplate firstQueueTemplate) {
JmsItemReader<String> jmsItemReader = new JmsItemReader<>();
jmsItemReader.setJmsTemplate(firstQueueTemplate);
jmsItemReader.setItemType(String.class);
return jmsItemReader;
}
@Bean
public ItemWriter<String> flatFileItemWriter() {
FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("/mypath/output.csv"));
writer.setLineAggregator(new PassThroughLineAggregator<String>());
return writer;
}
@Bean
@DependsOn(value = { "jmsItemReader", "jmsItemWriter", "jobRepository", "transactionManager" })
public Step queueReaderStep(JmsItemReader<String> jmsItemReader, ItemWriter<String> flatFileItemWriter, JobRepository jobRepository,
PlatformTransactionManager transactionManager) throws Exception {
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
AbstractTaskletStepBuilder<SimpleStepBuilder<String, String>> step = stepBuilderFactory.get("queueReaderStep").<String, String> chunk(10000)
.reader(jmsItemReader).writer(flatFileItemWriter);
return step.build();
}
@Bean
@DependsOn(value = { "jobRepository", "queueReaderStep" })
public Job jsmReaderJob(JobRepository jobRepository, Step queueReaderStep) {
return this.jobBuilderFactory.get("jsmReaderJob").repository(jobRepository).incrementer(new RunIdIncrementer())
.flow(queueReaderStep).end().build();
}