2016-08-05 79 views
0

我正在通过命令行调用我的批处理服务并为作业提供几个参数。创建作业时需要访问这些参数,因为我需要从数据库中查找数据作为参数提供的“站点名称”,并动态创建多个步骤。问题出现在'createJob'方法中。我硬编码的网站ID,但目前没有为itemizedReader方法异常:创建作业时需要访问作业启动器参数

Error creating bean with name 'scopedTarget.itemizedReader' defined in billing.BillingConfig: Unsatisfied dependency expressed through method 'itemizedReader' parameter 1: No qualifying bean of type [java.lang.String] 

Spring配置

package billing; 

import billing.components.AspiviaFieldSetter; 
import billing.components.AspiviaPrepStatementSetter; 
import billing.components.SummaryProcessor; 
import billing.mapper.ItemizedCostingMapper; 
import billing.model.BillingItem; 
import billing.model.ItemizedCosting; 
import billing.tasklet.SummaryOutputTasklet; 
import billing.batch.common.AppProps; 
import billing.batch.common.SqlConst; 
import billing.batch.common.model.ItemizedPartner; 
import billing.batch.common.repo.PartnerBillingRepo; 
import com.zaxxer.hikari.HikariConfig; 
import com.zaxxer.hikari.HikariDataSource; 
import java.sql.Timestamp; 
import java.text.SimpleDateFormat; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import javax.sql.DataSource; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
import org.springframework.batch.core.Job; 
import org.springframework.batch.core.Step; 
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; 
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; 
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; 
import org.springframework.batch.core.configuration.annotation.StepScope; 
import org.springframework.batch.core.job.builder.SimpleJobBuilder; 
import org.springframework.batch.core.launch.support.RunIdIncrementer; 
import org.springframework.batch.item.ItemReader; 
import org.springframework.batch.item.ItemWriter; 
import org.springframework.batch.item.database.JdbcBatchItemWriter; 
import org.springframework.batch.item.database.JdbcCursorItemReader; 
import org.springframework.batch.item.file.FlatFileItemReader; 
import org.springframework.batch.item.file.FlatFileItemWriter; 
import org.springframework.batch.item.file.mapping.DefaultLineMapper; 
import org.springframework.batch.item.file.transform.DelimitedLineAggregator; 
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; 
import org.springframework.batch.item.file.transform.FieldExtractor; 
import org.springframework.batch.repeat.RepeatStatus; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.ComponentScan; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.EnableAspectJAutoProxy; 
import org.springframework.context.annotation.Profile; 
import org.springframework.context.annotation.PropertySource; 
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; 
import org.springframework.core.io.FileSystemResource; 
import org.springframework.jdbc.core.JdbcTemplate; 
import org.springframework.jdbc.datasource.DataSourceTransactionManager; 

@ComponentScan(basePackages = {"billing", "billing.batch.common"}) 
@Configuration 
@EnableBatchProcessing 
@EnableAspectJAutoProxy 
@PropertySource("classpath:/app.properties") 
public class BillingConfig { 

    private static final Logger LOG = LogManager.getLogger(); 

    @Autowired 
    private AppProps appProps; 

    @Autowired 
    private PartnerBillingRepo billingRepo; 

    @Bean 
    @Profile("prod") 
    public DataSource datasource() { 
     final HikariConfig cfg = new HikariConfig(); 
     cfg.setJdbcUrl(appProps.getPartnerBillingUrl()); 
     cfg.setUsername(appProps.getPartnerBillingUsername()); 
     cfg.setPassword(appProps.getPartnerBillingPassword()); 
     cfg.addDataSourceProperty("cachePrepStmts", appProps.getCachePrepStatements()); 
     cfg.addDataSourceProperty("prepStmtCacheSize", appProps.getPrepStatementCacheSize()); 
     cfg.addDataSourceProperty("prepStmtCacheSqlLimit", appProps.getPrepStatementCacheSqlLimit()); 

     HikariDataSource ds = new HikariDataSource(cfg); 

     return ds; 
    } 

    @Bean 
    public JdbcTemplate template(DataSource ds) { 
     return new JdbcTemplate(ds); 
    } 

    @Bean 
    @StepScope 
    public FlatFileItemReader billingFileReader(@Value("#{jobParameters['input.file']}") String inputFile) { 
     DefaultLineMapper lineMapper = new DefaultLineMapper(); 
     lineMapper.setFieldSetMapper(new BillingFieldSetter()); 
     lineMapper.setLineTokenizer(new DelimitedLineTokenizer()); 

     FlatFileItemReader reader = new FlatFileItemReader(); 
     reader.setLineMapper(lineMapper); 
     reader.setResource(new FileSystemResource(inputFile)); 

     return reader; 
    } 

    @Bean 
    @StepScope 
    public JdbcBatchItemWriter BillingWriter(DataSource ds, BillingPrepStatementSetter setter) { 
     JdbcBatchItemWriter writer = new JdbcBatchItemWriter(); 
     writer.setDataSource(ds); 
     writer.setItemPreparedStatementSetter(setter); 
     writer.setSql(SqlConst.INSERT_INTO_BILLING); 

     return writer; 
    } 

    @Bean 
    @StepScope 
    public BillingPrepStatementSetter prepStatementSetter() { 
     return new BillingPrepStatementSetter(); 
    } 

    @Bean 
    @StepScope 
    public SummaryProcessor summaryProc() { 
     return new SummaryProcessor(); 
    } 

    @Bean 
    @StepScope 
    public SummaryOutputTasklet summaryTask() { 
     return new SummaryOutputTasklet(); 
    } 

    @Bean 
    @StepScope 
    public ItemReader<ItemizedCosting> itemizedReader(@Value("#{jobParameters['site.id']}") Integer siteId, String accountCodes, 
      @Value("#{jobParameter['start.date']") String startDate, @Value("#{jobParameters['end.date']") String endDate) { 

     JdbcCursorItemReader reader = new JdbcCursorItemReader(); 
     reader.setDataSource(datasource()); 
     reader.setSql(SqlConst.SELECT_ITEMIZED_BILLING_FOR_ACCOUNT_CODES); 
     reader.setRowMapper(new ItemizedCostingMapper()); 
     reader.setPreparedStatementSetter((ps) -> { 
      try { 
       SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); 

       ps.setTimestamp(0, new Timestamp(formatter.parse(startDate).getTime())); 
       ps.setTimestamp(1, new Timestamp(formatter.parse(endDate).getTime())); 
      } catch (Exception err) { 
       LOG.error("Unable to parse dates, start: {} end: {}", startDate, endDate); 
      } 
      ps.setString(2, accountCodes); 
      ps.setInt(3, siteId); 
     }); 

     return reader; 
    } 

    @Bean 
    @StepScope 
    public ItemWriter<ItemizedCosting> itemizedWriter(@Value("start.date") String startDate, 
      String partnerName) { 

     DelimitedLineAggregator lineAgg = new DelimitedLineAggregator(); 
     FieldExtractor<ItemizedCosting> extractor = (f) -> { 
      Object[] output = new Object[9]; 
      output[0] = f.getExtension(); 
      output[1] = f.getPbxCallTime(); 
      output[2] = f.getDuration(); 
      output[3] = f.getAccountCode(); 
      output[4] = f.getDigits(); 
      output[5] = f.getCost(); 
      output[6] = f.getDestination(); 
      output[7] = f.getCarrier(); 
      output[8] = f.getAttribute(); 

      return output; 
     }; 
     lineAgg.setFieldExtractor(extractor); 

     Timestamp start = null; 

     try { 
      SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); 

      start = new Timestamp(formatter.parse(startDate).getTime()); 
     } catch (Exception e) { 
      LOG.error("Unable to parse date: {}", startDate); 
     } 

     FlatFileItemWriter<ItemizedCosting> writer = new FlatFileItemWriter<>(); 
     writer.setEncoding("UTF-8"); 
     writer.setLineAggregator(lineAgg); 
     writer.setResource(new FileSystemResource(String.format("%s/%2$tY-%2$tm_%s_", 
       appProps.getItemizedBillingOutputPath(), start, partnerName))); 

     return writer; 
    } 

    @Bean 
    public Job createJob(JobBuilderFactory jobBuilder, StepBuilderFactory stepBuilders, DataSource ds, FlatFileItemReader reader) 
      throws Exception { 
     Step findSiteIdStep = stepBuilders.get("find.site.id").tasklet((contribution, chunkContext) -> { 
      String siteName 
        = (String) chunkContext.getStepContext().getJobParameters().get(BillingConst.PARAM_SITE); 

      Integer siteId = billingRepo.findSiteIdByName(siteName); 

      chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().put(
        BillingConst.SITE_ID, siteId); 

      return RepeatStatus.FINISHED; 
     }).build(); 

     Step processFileStep = stepBuilders.get("process.file").<BillingItem, BillingItem>chunk(appProps.getChunkSize()) 
       .reader(reader) 
       .processor(summaryProc()) 
       .writer(aspiviaWriter(ds, prepStatementSetter())).build(); 

     Step outputSummary = stepBuilders.get("output.summary").tasklet(summaryTask()).build(); 

     SimpleJobBuilder builder = jobBuilder.get("process.aspivia").incrementer(new RunIdIncrementer()) 
       .start(findSiteIdStep) 
       .next(processFileStep) 
       .next(outputSummary); 

     List<ItemizedPartner> partners = billingRepo.findPartnersForSite("CPT"); 
     Integer siteId = billingRepo.findSiteIdByName("CPT"); 

     Map<String, String> partnerAccCodes = new HashMap<>(); 

     partners.stream().forEach(i -> { 
      if (!partnerAccCodes.containsKey(i.getPartnerName())) { 
       partnerAccCodes.put(i.getPartnerName(), ""); 
      } 

      String accCodes = partnerAccCodes.get(i.getPartnerName()); 
      accCodes += i.getAccountCode().toString() + ", "; 

      partnerAccCodes.put(i.getPartnerName(), accCodes); 
     }); 

     partnerAccCodes.forEach((k, v) -> { 
      Step itemizedReport = stepBuilders.get("itemized." + k).<ItemizedCosting, ItemizedCosting>chunk(appProps.getChunkSize()) 
        .reader(itemizedReader(siteId, v, null, null)) 
        .writer(itemizedWriter(null, k)).build(); 

      builder.next(itemizedReport); 
     }); 

     return builder.build(); 
    } 

    @Bean 
    public static PropertySourcesPlaceholderConfigurer propCfg() { 
     return new PropertySourcesPlaceholderConfigurer(); 
    } 

    @Bean 
    public DataSourceTransactionManager transactionManager(DataSource datasource) { 
     return new DataSourceTransactionManager(datasource); 
    } 
} 

回答

0

此问题是由于对Spring Batch的是如何工作的生命周期。如果bean是用@StepScope装饰的,则作业参数只有在启动后才可用。

final Job loadAspiviaDataJob = context.getBean(Job.class); 
    final JobLauncher launcher = context.getBean(JobLauncher.class); 

    JobParametersBuilder paramBuilder = new JobParametersBuilder(); 
    paramBuilder.addString(AspiviaConst.PARAM_INPUT_FILE, inputFile); 
    paramBuilder.addString(AspiviaConst.PARAM_SITE, site); 
    paramBuilder.addString(AspiviaConst.PARAM_OUTPUT_FILE_PATH, summaryFile); 

JobExecution runStatus = launcher.run(loadAspiviaDataJob,paramBuilder.toJobParameters());

在上面的代码相同,我们获取工作这是通过我的配置createJob Bean方法设置。作业参数不可用。

我做了什么让获得我所需要的值如下:

  • 增加了额外的@PropertySource(“类路径:cli-runtime.properties”)
  • 的Application.java说启动spring批处理作业会将我们需要的属性保存到cli-runtime.properties。在@Configuration类中创建作业时,将从属性文件加载值,并且可以在需要的作业中创建附加步骤