2014-12-03 54 views
0

我正在写一个Spring批处理,并在需要时调整它的大小。 我的ApplicationContext看起来像这样Spring批次多线程

@Configuration 
@EnableBatchProcessing 
@EnableTransactionManagement 
@ComponentScan(basePackages = "in.springbatch") 
@PropertySource(value = {"classpath:springbatch.properties"}) 

public class ApplicationConfig { 

@Autowired 
Environment environment; 

@Autowired 
private JobBuilderFactory jobs; 

@Autowired 
private StepBuilderFactory stepBuilderFactory; 

@Bean 
public Job job() throws Exception { 

    return jobs.get("spring_batch") 
      .flow(step()).end() 
      .build(); 
} 

@Bean(name = "dataSource", destroyMethod = "close") 
public DataSource dataSource() { 

    BasicDataSource basicDataSource = new BasicDataSource(); 



    return basicDataSource; 
} 

@Bean 
public JobRepository jobRepository() throws Exception { 
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); 
    jobRepositoryFactoryBean.setTransactionManager(transactionManager()); 
    jobRepositoryFactoryBean.setDataSource(dataSource()); 
    return jobRepositoryFactoryBean.getObject(); 
} 

@Bean(name = "batchstep") 
public Step step() throws Exception { 

    return stepBuilderFactory.get("batchstep").allowStartIfComplete(true). 
    transactionManager(transactionManager()). 
      chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build(); 

    } 



@Bean 
ItemReader batchReader() throws Exception { 
    System.out.println(Thread.currentThread().getName()+"reader"); 
    HibernateCursorItemReader<Source> hibernateCursorItemReader = new HibernateCursorItemReader<>(); 
    hibernateCursorItemReader.setQueryString("from Source"); 
    hibernateCursorItemReader.setFetchSize(2); 
    hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject()); 


    hibernateCursorItemReader.close(); 
    return hibernateCursorItemReader; 
} 

@Bean 
public ItemProcessor processor() { 
    return new BatchProcessor(); 
} 

@Bean 
public ItemWriter writer() { 
    return new BatchWriter(); 
} 

public TaskExecutor taskExecutor(){ 

    SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch"); 
    asyncTaskExecutor.setConcurrencyLimit(5); 
    return asyncTaskExecutor; 


} 
@Bean 
public LocalSessionFactoryBean sessionFactory() { 
    LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean(); 
    sessionFactory.setDataSource(dataSource()); 
    sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"}); 
    sessionFactory.setHibernateProperties(hibernateProperties()); 

    return sessionFactory; 
} 

@Bean 
public PersistenceExceptionTranslationPostProcessor exceptionTranslation() { 
    return new PersistenceExceptionTranslationPostProcessor(); 
} 

@Bean 
@Autowired 
public HibernateTransactionManager transactionManager() { 
    HibernateTransactionManager txManager = new HibernateTransactionManager(); 
    txManager.setSessionFactory(sessionFactory().getObject()); 

    return txManager; 
} 

Properties hibernateProperties() { 
    return new Properties() { 
     { 
      setProperty("hibernate.hbm2ddl.auto",  environment.getProperty("hibernate.hbm2ddl.auto")); 
      setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect")); 
      setProperty("hibernate.globally_quoted_identifiers", "false"); 

     } 
    }; 
} 

}

  1. 有了上面的配置,我能够从DB,进程读取数据并写入到数据库。
  2. 我使用块大小为2并从光标读取2条记录使用 HibernateCusrsorItem读取器和我从DB读取的查询是基于 日期来选择当前日期记录。
  3. 到目前为止,我能够实现所需的行为,并重新启动 能力,但只能处理由于上次运行失败而没有处理的作业记录 。

现在我的要求是批量使用多线程来处理数据并写入数据库。

我的处理器和作家看起来像这样

@Component 
public class BatchProcessor implements ItemProcessor<Source,DestinationDto>{ 

@Override 
public DestinationDto process(Source source) throws Exception { 

     System.out.println(Thread.currentThread().getName()+":"+source); 
     DestinationDto destination=new DestinationDto(); 
     destination.setName(source.getName()); 
     destination.setValue(source.getValue()); 
     destination.setSourceId(source.getSourceId().toString()); 

    return destination; 
} 
@Component 
public class BatchWriter implements ItemWriter<DestinationDto>{ 

@Autowired 
IBatchDao batchDao; 

@Override 
public void write(List<? extends DestinationDto> list) throws Exception { 
    System.out.println(Thread.currentThread().getName()+":"+list); 
    batchDao.saveToDestination((List<DestinationDto>)list); 
} 

我更新了我的步骤,并且增加了一个ThreadPoolTask​​Executor类如下

@Bean(name = "batchstep") 
public Step step() throws Exception { 

    return stepBuilderFactory.get("batchstep").allowStartIfComplete(true). 
    transactionManager(transactionManager()).chunk(1).reader(batchReader()). 
    processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build(); 

    } 

在这之后我的处理器是越来越被多个线程,但有相同的源数据称为。 有什么额外的我需要做的?

回答

0

这是一个大问题

  1. 在得到一个好的答案你最好的选择是通过在Spring Batch的文档的缩放和并行处理章节看(Here

  2. 可能存在在弹簧批处理示例中是一些多线程样本(Here

  3. 一个简单的方法来创建未来的处理器 - 您将一个未来的处理器在未来对象中处理逻辑,而你的弹簧处理器类只会将对象添加到未来。然后,您的作家课在执行写入过程之前等待未来完成。对不起,我没有一个样本可以为你指出 - 但如果你有特定的问题,我可以试着回答!

+0

感谢您抽出时间阅读冗长的问题。我认为首先我应该花一些时间尝试Spring Batch的开箱即用功能来实现多线程 – Amardeep 2014-12-03 20:50:21

+1

实际上对你来说使用案例 - 选项3可能会更容易。 Spring批量开箱功能可以实现多线程工作,当你可以多线程完成整个步骤时,即你的阅读器可以读取大块信息 – 2014-12-05 03:48:15

+0

好吧,我可以尝试选项3.但是如你所说“Spring Batch out of当您可以对整个步骤进行多线程处理时,该功能可以很好地实现多线程,即您的阅读器可以以块的形式读取信息。我正在读取HibernateCursorReader中的数据,该数据提供了一个以块为单位读取数据的选项。如果这是正确的,那么你能指出我,我怎么能多线程我的步骤? – Amardeep 2014-12-05 08:51:37