我创建了一个POC项目,我在其中使用Spring批处理本地分区步骤将Employee表10记录移动到NewEmployee表。我配置了4个线程来运行此批处理。 当我运行这个批处理过程时,我可以看到pagingItemReader()方法不是由从属步骤调用的。由于此OraclePagingQueryProvider未被调用。 我注意到错过的数字记录(未移动)等于配置的线程数。 我开发这个POC从下面的链接以指导: - https://github.com/mminella/LearningSpringBatch/tree/master/src/localPartitioning从slaveStep不调用Spring批处理JDBCPagingItemReader
请注意,当我更换主机和从机代码与正常读取,处理以及没有多线程参与编写逻辑,下面的代码工作正常。
DB中的BATCH_STEP_EXECUTION表还表示只移动了8条记录(这里有2条记录再次错过了,它等于线程数)。 DB记录表示如下: -
STEP_NAME STATUS COMMIT_COUNT READ_COUNT WRITE_COUNT EXIT_CODE slaveStep:分区1已完成1 4 4竣工 slaveStep:已经完成1 4 4竣工 masterStep partition0竣工2 8 8竣工
的代码段配置类
@Bean
public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{
JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor();
registrar.setJobRegistry(this.jobRegistry);
registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
registrar.afterPropertiesSet();
return registrar;
}
@Bean
public JobOperator jobOperator() throws Exception{
SimpleJobOperator simpleJobOperator=new SimpleJobOperator();
simpleJobOperator.setJobLauncher(this.jobLauncher);
simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
simpleJobOperator.setJobRepository(this.jobRepository);
simpleJobOperator.setJobExplorer(this.jobExplorer);
simpleJobOperator.setJobRegistry(this.jobRegistry);
simpleJobOperator.afterPropertiesSet();
return simpleJobOperator;
}
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner partitioner = new ColumnRangePartitioner();
partitioner.setColumn("id");
partitioner.setDataSource(this.dataSource);
partitioner.setTable("Employee");
LOGGER.info("partitioner---->"+partitioner);
return partitioner;
}
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(gridSize)
.taskExecutor(taskExecutorConfiguration.taskExecutor())
.build();
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Employee, NewEmployee>chunk(chunkSize)
.reader(pagingItemReader(null,null))
.processor(employeeProcessor())
.writer(employeeWriter.customItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("FR")
.start(masterStep())
.build();
}
@Bean
public ItemProcessor<Employee, NewEmployee> employeeProcessor() {
return new EmployeeProcessor();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
*/
@Bean
@StepScope
public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue,
@Value("#{stepExecutionContext['maxValue']}") Long maxvalue) {
JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>();
reader.setDataSource(this.dataSource);
// this should be equal to chunk size for the performance reasons.
reader.setFetchSize(chunkSize);
reader.setRowMapper((resultSet, i) -> {
return new Employee(resultSet.getLong("id"),
resultSet.getString("firstName"),
resultSet.getString("lastName"));
});
OraclePagingQueryProvider provider = new OraclePagingQueryProvider();
provider.setSelectClause("id, firstName, lastName");
provider.setFromClause("from Employee");
LOGGER.info("min-->"+minvalue);
LOGGER.info("max-->"+maxvalue);
provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
provider.setSortKeys(sortKeys);
reader.setQueryProvider(provider);
LOGGER.info("reader--->"+reader);
return reader;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
ColumnRangePartitioner类的代码片段: -
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
您是否验证了'Partitioner'正在恢复分区?作业存储库中的工作人员是否执行执行记录? –
嗨迈克尔 - 感谢您的回复。我用BATCH_STEP_EXECUTION表记录更新了这个问题。在这里,我可以看到只有8个记录被读取器读出10(读取的记录数量等于线程数量)。同样,当读取器不读取源表中的第二列数据时,它将第一列的值复制到第二列,同样正在保存在目标表中。请注意,当我删除分区逻辑时,这些问题都不存在。谢谢你的帮助。 – Abhilash