2017-08-17 115 views
0

我创建了一个POC项目,我在其中使用Spring批处理本地分区步骤将Employee表10记录移动到NewEmployee表。我配置了4个线程来运行此批处理。 当我运行这个批处理过程时,我可以看到pagingItemReader()方法不是由从属步骤调用的。由于此OraclePagingQueryProvider未被调用。 我注意到错过的数字记录(未移动)等于配置的线程数。 我开发这个POC从下面的链接以指导: - https://github.com/mminella/LearningSpringBatch/tree/master/src/localPartitioning从slaveStep不调用Spring批处理JDBCPagingItemReader

请注意,当我更换主机和从机代码与正常读取,处理以及没有多线程参与编写逻辑,下面的代码工作正常。

DB中的BATCH_STEP_EXECUTION表还表示只移动了8条记录(这里有2条记录再次错过了,它等于线程数)。 DB记录表示如下: -

STEP_NAME STATUS COMMIT_COUNT READ_COUNT WRITE_COUNT EXIT_CODE slaveStep:分区1已完成1 4 4竣工 slaveStep:已经完成1 4 4竣工 masterStep partition0竣工2 8 8竣工

的代码段配置类

  @Bean 
       public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{ 
        JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor(); 
        registrar.setJobRegistry(this.jobRegistry); 
        registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); 
        registrar.afterPropertiesSet(); 
        return registrar; 
       } 

       @Bean 
       public JobOperator jobOperator() throws Exception{ 
        SimpleJobOperator simpleJobOperator=new SimpleJobOperator(); 
        simpleJobOperator.setJobLauncher(this.jobLauncher); 
        simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter()); 
        simpleJobOperator.setJobRepository(this.jobRepository); 
        simpleJobOperator.setJobExplorer(this.jobExplorer); 
        simpleJobOperator.setJobRegistry(this.jobRegistry); 

        simpleJobOperator.afterPropertiesSet(); 
        return simpleJobOperator; 

       } 

       @Bean 
       public ColumnRangePartitioner partitioner() { 
        ColumnRangePartitioner partitioner = new ColumnRangePartitioner(); 
        partitioner.setColumn("id"); 
        partitioner.setDataSource(this.dataSource); 
        partitioner.setTable("Employee"); 
        LOGGER.info("partitioner---->"+partitioner); 
        return partitioner; 
       } 

       @Bean 
       public Step masterStep() { 
        return stepBuilderFactory.get("masterStep") 
          .partitioner(slaveStep().getName(), partitioner()) 
          .step(slaveStep()) 
          .gridSize(gridSize) 
          .taskExecutor(taskExecutorConfiguration.taskExecutor()) 
          .build(); 
       } 

       @Bean 
       public Step slaveStep() { 
        return stepBuilderFactory.get("slaveStep") 
          .<Employee, NewEmployee>chunk(chunkSize) 
          .reader(pagingItemReader(null,null)) 
          .processor(employeeProcessor()) 
          .writer(employeeWriter.customItemWriter()) 
          .build(); 
       } 

       @Bean 
       public Job job() { 
        return jobBuilderFactory.get("FR") 
          .start(masterStep()) 
          .build(); 
       } 

       @Bean 
       public ItemProcessor<Employee, NewEmployee> employeeProcessor() { 
        return new EmployeeProcessor(); 
       } 

       @Override 
       public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 
        this.applicationContext=applicationContext; 
       } 


       */ 

       @Bean 
       @StepScope 
       public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue, 
         @Value("#{stepExecutionContext['maxValue']}") Long maxvalue) { 

        JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>(); 
        reader.setDataSource(this.dataSource); 
        // this should be equal to chunk size for the performance reasons. 
        reader.setFetchSize(chunkSize); 
        reader.setRowMapper((resultSet, i) -> { 
         return new Employee(resultSet.getLong("id"), 
           resultSet.getString("firstName"), 
           resultSet.getString("lastName")); 
        }); 

        OraclePagingQueryProvider provider = new OraclePagingQueryProvider(); 
        provider.setSelectClause("id, firstName, lastName"); 
        provider.setFromClause("from Employee"); 
        LOGGER.info("min-->"+minvalue); 
        LOGGER.info("max-->"+maxvalue); 
        provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue); 

        Map<String, Order> sortKeys = new HashMap<>(1); 
        sortKeys.put("id", Order.ASCENDING); 
        provider.setSortKeys(sortKeys); 

        reader.setQueryProvider(provider); 
        LOGGER.info("reader--->"+reader); 
        return reader; 
       } 

     @Override 
     public Map<String, ExecutionContext> partition(int gridSize) { 
      int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
      int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
      int targetSize = (max - min)/gridSize + 1; 

      Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
      int number = 0; 
      int start = min; 
      int end = start + targetSize - 1; 

      while (start <= max) { 
       ExecutionContext value = new ExecutionContext(); 
       result.put("partition" + number, value); 

       if (end >= max) { 
        end = max; 
       } 
       LOGGER.info("Start-->" + start); 
       LOGGER.info("end-->" + end); 
       value.putInt("minValue", start); 
       value.putInt("maxValue", end); 
       start += targetSize; 
       end += targetSize; 
       number++; 
      } 

      return result; 
     } 

ColumnRangePartitioner类的代码片段: -

int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
    int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
    int targetSize = (max - min)/gridSize + 1; 

    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
    int number = 0; 
    int start = min; 
    int end = start + targetSize - 1; 

    while (start <= max) { 
     ExecutionContext value = new ExecutionContext(); 
     result.put("partition" + number, value); 

     if (end >= max) { 
      end = max; 
     } 
     LOGGER.info("Start-->" + start); 
     LOGGER.info("end-->" + end); 
     value.putInt("minValue", start); 
     value.putInt("maxValue", end); 
     start += targetSize; 
     end += targetSize; 
     number++; 
    } 

    return result; 
+0

您是否验证了'Partitioner'正在恢复分区?作业存储库中的工作人员是否执行执行记录? –

+0

嗨迈克尔 - 感谢您的回复。我用BATCH_STEP_EXECUTION表记录更新了这个问题。在这里,我可以看到只有8个记录被读取器读出10(读取的记录数量等于线程数量)。同样,当读取器不读取源表中的第二列数据时,它将第一列的值复制到第二列,同样正在保存在目标表中。请注意,当我删除分区逻辑时,这些问题都不存在。谢谢你的帮助。 – Abhilash

回答

0

我找到了这个问题的解决方案。分区器后,我们必须在masterStep中添加partitionHandler。在partitionHandler中,我们定义了slaveStep和其他配置。以下是代码片段。

MasterStep: - 添加这里partitionHandler代码,

 stepBuilderFactory 
      .get("userMasterStep") 
      .partitioner(userSlaveStep().getName(), userPartitioner()) 
      .partitionHandler(userMasterSlaveHandler()) 
      .build(); 

定义一个名为partitionHandler另一个bean这里所说的从步

@Bean 
public PartitionHandler userMasterSlaveHandler() throws Exception { 
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); 
    handler.setGridSize(gridSize); 
    handler.setTaskExecutor(taskExecutorConfiguration.taskExecutor()); 
    handler.setStep(userSlaveStep()); 
    handler.afterPropertiesSet(); 
    return handler; 
}