0
我正在写一个Spring批处理,并在需要时调整它的大小。 我的ApplicationContext看起来像这样Spring批次多线程
@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@ComponentScan(basePackages = "in.springbatch")
@PropertySource(value = {"classpath:springbatch.properties"})
public class ApplicationConfig {
@Autowired
Environment environment;
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() throws Exception {
return jobs.get("spring_batch")
.flow(step()).end()
.build();
}
@Bean(name = "dataSource", destroyMethod = "close")
public DataSource dataSource() {
BasicDataSource basicDataSource = new BasicDataSource();
return basicDataSource;
}
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setTransactionManager(transactionManager());
jobRepositoryFactoryBean.setDataSource(dataSource());
return jobRepositoryFactoryBean.getObject();
}
@Bean(name = "batchstep")
public Step step() throws Exception {
return stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
transactionManager(transactionManager()).
chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build();
}
@Bean
ItemReader batchReader() throws Exception {
System.out.println(Thread.currentThread().getName()+"reader");
HibernateCursorItemReader<Source> hibernateCursorItemReader = new HibernateCursorItemReader<>();
hibernateCursorItemReader.setQueryString("from Source");
hibernateCursorItemReader.setFetchSize(2);
hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject());
hibernateCursorItemReader.close();
return hibernateCursorItemReader;
}
@Bean
public ItemProcessor processor() {
return new BatchProcessor();
}
@Bean
public ItemWriter writer() {
return new BatchWriter();
}
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean
public LocalSessionFactoryBean sessionFactory() {
LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean();
sessionFactory.setDataSource(dataSource());
sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"});
sessionFactory.setHibernateProperties(hibernateProperties());
return sessionFactory;
}
@Bean
public PersistenceExceptionTranslationPostProcessor exceptionTranslation() {
return new PersistenceExceptionTranslationPostProcessor();
}
@Bean
@Autowired
public HibernateTransactionManager transactionManager() {
HibernateTransactionManager txManager = new HibernateTransactionManager();
txManager.setSessionFactory(sessionFactory().getObject());
return txManager;
}
Properties hibernateProperties() {
return new Properties() {
{
setProperty("hibernate.hbm2ddl.auto", environment.getProperty("hibernate.hbm2ddl.auto"));
setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect"));
setProperty("hibernate.globally_quoted_identifiers", "false");
}
};
}
}
- 有了上面的配置,我能够从DB,进程读取数据并写入到数据库。
- 我使用块大小为2并从光标读取2条记录使用 HibernateCusrsorItem读取器和我从DB读取的查询是基于 日期来选择当前日期记录。
- 到目前为止,我能够实现所需的行为,并重新启动 能力,但只能处理由于上次运行失败而没有处理的作业记录 。
现在我的要求是批量使用多线程来处理数据并写入数据库。
我的处理器和作家看起来像这样
@Component
public class BatchProcessor implements ItemProcessor<Source,DestinationDto>{
@Override
public DestinationDto process(Source source) throws Exception {
System.out.println(Thread.currentThread().getName()+":"+source);
DestinationDto destination=new DestinationDto();
destination.setName(source.getName());
destination.setValue(source.getValue());
destination.setSourceId(source.getSourceId().toString());
return destination;
}
@Component
public class BatchWriter implements ItemWriter<DestinationDto>{
@Autowired
IBatchDao batchDao;
@Override
public void write(List<? extends DestinationDto> list) throws Exception {
System.out.println(Thread.currentThread().getName()+":"+list);
batchDao.saveToDestination((List<DestinationDto>)list);
}
我更新了我的步骤,并且增加了一个ThreadPoolTaskExecutor类如下
@Bean(name = "batchstep")
public Step step() throws Exception {
return stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
transactionManager(transactionManager()).chunk(1).reader(batchReader()).
processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build();
}
在这之后我的处理器是越来越被多个线程,但有相同的源数据称为。 有什么额外的我需要做的?
感谢您抽出时间阅读冗长的问题。我认为首先我应该花一些时间尝试Spring Batch的开箱即用功能来实现多线程 – Amardeep 2014-12-03 20:50:21
实际上对你来说使用案例 - 选项3可能会更容易。 Spring批量开箱功能可以实现多线程工作,当你可以多线程完成整个步骤时,即你的阅读器可以读取大块信息 – 2014-12-05 03:48:15
好吧,我可以尝试选项3.但是如你所说“Spring Batch out of当您可以对整个步骤进行多线程处理时,该功能可以很好地实现多线程,即您的阅读器可以以块的形式读取信息。我正在读取HibernateCursorReader中的数据,该数据提供了一个以块为单位读取数据的选项。如果这是正确的,那么你能指出我,我怎么能多线程我的步骤? – Amardeep 2014-12-05 08:51:37