2016-08-15 108 views
0

我有一个春天的批处理作业,做以下...春季批次执行动态生成的步骤,一个tasklet

步骤1.创建一个需要处理的

第2步:创建对象的列表取决于有多少项目是在步骤1中创建

步骤3.试图从在步骤2中

的执行X的步骤中创建步骤列表执行步骤的对象的列表的步骤的列表在executeDynamicStepsTasklet()中完成。虽然代码运行没有任何错误,但它似乎没有做任何事情。我在该方法中看到的是否正确?

感谢

/* * * /

@Configuration 
public class ExportMasterListCsvJobConfig { 

public static final String JOB_NAME = "exportMasterListCsv"; 
@Autowired 
public JobBuilderFactory jobBuilderFactory; 

@Autowired 
public StepBuilderFactory stepBuilderFactory; 

@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") 
public int chunkSize; 

@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") 
public String masterListSql; 

@Autowired 
public DataSource onlineStagingDb; 

@Value("${out.dir}") 
public String outDir; 

@Value("${exportMasterListCsv.generatePromoStartDateEndDateGroupings.promoStartDateEndDateSql}") 
private String promoStartDateEndDateSql; 


private List<DivisionIdPromoCompStartDtEndDtGrouping> divisionIdPromoCompStartDtEndDtGrouping; 

private List<Step> dynamicSteps = Collections.synchronizedList(new ArrayList<Step>()) ; 


@Bean 
public Job exportMasterListCsvJob(
     @Qualifier("createJobDatesStep") Step createJobDatesStep, 
     @Qualifier("createDynamicStepsStep") Step createDynamicStepsStep, 
     @Qualifier("executeDynamicStepsStep") Step executeDynamicStepsStep) { 

    return jobBuilderFactory.get(JOB_NAME) 
      .flow(createJobDatesStep) 
      .next(createDynamicStepsStep) 
      .next(executeDynamicStepsStep) 
      .end().build(); 
} 


@Bean 
public Step executeDynamicStepsStep(
     @Qualifier("executeDynamicStepsTasklet") Tasklet executeDynamicStepsTasklet) { 

    return stepBuilderFactory 
       .get("executeDynamicStepsStep") 
       .tasklet(executeDynamicStepsTasklet) 
       .build();    
} 

@Bean 
public Tasklet executeDynamicStepsTasklet() { 

    return new Tasklet() { 

     @Override 
     public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { 
      FlowStep flowStep = new FlowStep(createParallelFlow()); 
      SimpleJobBuilder jobBuilder = jobBuilderFactory.get("myNewJob").start(flowStep); 
      return RepeatStatus.FINISHED; 
     } 
    }; 
} 

public Flow createParallelFlow() { 
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); 
    taskExecutor.setConcurrencyLimit(1); 

    List<Flow> flows = dynamicSteps.stream() 
      .map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build()) 
      .collect(Collectors.toList()); 

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow") 
      .split(taskExecutor) 
      .add(flows.toArray(new Flow[flows.size()])) 
      .build(); 
} 

@Bean 

public Step createDynamicStepsStep(
     @Qualifier("createDynamicStepsTasklet") Tasklet createDynamicStepsTasklet) { 

    return stepBuilderFactory 
       .get("createDynamicStepsStep") 
       .tasklet(createDynamicStepsTasklet) 
       .build();    
} 

@Bean 
@JobScope 
public Tasklet createDynamicStepsTasklet() { 

    return new Tasklet() { 

     @Override 
     public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { 
      for (DivisionIdPromoCompStartDtEndDtGrouping grp: divisionIdPromoCompStartDtEndDtGrouping){ 

       System.err.println("grp: " + grp); 

       String stepName = "stp_" + grp; 

       String fileName = grp + FlatFileConstants.EXTENSION_CSV; 

       Step dynamicStep = 
         stepBuilderFactory.get(stepName) 
         .<MasterList,MasterList> chunk(10) 
         .reader(queryStagingDbReader(
           grp.getDivisionId(), 
           grp.getRpmPromoCompDetailStartDate(), 
           grp.getRpmPromoCompDetailEndDate())) 
         .writer(masterListFileWriter(fileName))         
         .build(); 

       dynamicSteps.add(dynamicStep); 

      } 
      System.err.println("createDynamicStepsTasklet dynamicSteps: " + dynamicSteps); 
      return RepeatStatus.FINISHED; 
     } 
    }; 
} 


public FlatFileItemWriter<MasterList> masterListFileWriter(String fileName) { 
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>(); 
    writer.setResource(new FileSystemResource(new File(outDir, fileName))); 
    writer.setHeaderCallback(masterListFlatFileHeaderCallback()); 
    writer.setLineAggregator(masterListFormatterLineAggregator()); 
    return writer; 
} 

所以现在我有需要执行的动态步骤的列表,我相信他们是在StepScope。有人可以告诉我如何执行它们吗

回答

0

这是行不通的。您的Tasklet只是使用FlowStep创建一个作业作为第一步。使用jobBuilderfactory只是创建作业。它不会启动它。方法名称“开始”可能会引起误解,因为这仅仅定义了第一步。但它没有启动这项工作。

一旦它开始,您就不能更改作业的结构(其步骤和子步骤)。因此,根据步骤1中计算的内容,不可能在步骤2中配置流程步骤(当然,您可以在弹簧结构内部进行更深入的修改,并直接修改bean等等,不想这样做)。

我建议你使用一种“SetupBean”和一个适当的postConstruct方法,这个方法被注入你的类来配置你的工作。这个“SetupBean”负责计算正在处理的对象列表。

@Component 
public class SetUpBean { 

    private List<Object> myObjects; 

    @PostConstruct 
    public afterPropertiesSet() { 
    myObjects = ...; 
    } 

    public List<Object> getMyObjects() { 
    return myObjects; 
    } 
} 

@Configuration 
public class JobConfiguration { 

    @Autowired 
    private JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    private StepBuilderFactory stepBuilderFactory; 

    @Autowired 
    private SetUpBean setup; 

    ... 
} 
+0

谢谢。在setupBean中,是否可以使用JobScope注释访问jobParameters? – Richie

+0

作业未运行,因此没有JobScope,因此JobScope注释将不起作用。你提供jobParameters作为commandLine-Arguments吗?你在用SpringBoot来启动你的工作吗? –

+0

我是,但我的用户正在通过休息端点执行作业,所以命令行参数不是一个选项。我可以选择你的大脑一分钟,问你是否必须根据数据中的某个关键字将读取器数据分成多个输出文件,你会怎么做? – Richie