2016-12-28 126 views
2

我正在使用Spring Boot,Spring Batch和Camel的项目。在完成弹簧批处理作业之前返回作业ID“立即”

批处理过程是通过调用其余端点来启动的。其余的控制器启动一个骆驼路线,启动弹簧批处理作业流程(通过弹簧批量骆驼组件)。

我无法控制调用我的应用程序的外部应用程序。我的应用程序是一个更大的夜间工作流程的一部分。

批处理作业可能需要很长时间才能完成,因此外部应用程序会定期通过另一个休息端点轮询我的批处理作业,询问作业是否完成。它通过轮询一个状态休息端点并使用jobExecution的id来执行此操作。

为了完成这个流程,我实现了一个通过ProducerTemplate启动骆驼路线的休息控制器。我的问题是在开始骆驼路线后立即返回作业执行ID。我不希望剩下的电话等到作业完成后才能返回。

startJobViaRestCall ------> createBatchJob ----> runBatchJobUntilDone 
            | 
            | 
     Return jobExecutionData | 
<---------------------------------- 

我试过使用异步调用和期货,但没有运气。我也试图用骆驼窃听无济于事。问题是只有“onComplete”事件。我需要一个在作业创建完成后立即返回的钩子,但不能运行。

例如,下面的代码会一直等到批量作业完成,然后再返回我想要发回的JobExecution数据(如json)。这是有道理的,因为extractFutureBody将等待响应准备就绪。

@RestController 
@Slf4j 
public class BatchJobController { 

    @Autowired 
    ProducerTemplate producerTemplate; 

    @RequestMapping(value = "/batch/job/start", method = RequestMethod.GET) 
    @ResponseBody 
    public String startBatchJob() { 
     log.info("BatchJob start called..."); 

     String jobExecution = producerTemplate.extractFutureBody(producerTemplate.asyncRequestBody(BatchRoute.ENDPOINT_JOB_START, ""), String.class); 

     return jobExecution; 
    } 

}  

骆驼的路线是春季批次组分

public class BatchRoute<I, O> extends BaseRoute { 

    private static final String ROUTE_START_BATCH = "spring-batch:springBatchJob"; 

    @Override 
    public void configure() { 

     super.configure(); 
     from(ENDPOINT_JOB_START).to(ROUTE_START_BATCH); 

    } 
} 

任何想法,我怎么能尽快它是可用返回JobExecution数据的简单通话?

回答

0

不知道如何在骆驼中做到这一点,但这里是使用春天休息的示例作业执行。

@RestController 
public class KpRest { 

    private static final Logger LOG = LoggerFactory.getLogger(KpRest.class); 
    private static String RUN_ID_KEY = "run.id"; 

    @Autowired 
    private JobLauncher launcher; 

    private final AtomicLong incrementer = new AtomicLong(); 


    @Autowired 
    private Job job; 


    @RequestMapping("/hello") 
    public String sayHello(){ 

     try { 
      JobParameters parameters = new JobParametersBuilder().addLong(RUN_ID_KEY, incrementer.incrementAndGet()).toJobParameters(); 
      JobExecution execution = launcher.run(job, parameters); 
      LOG.info("JobId {}, JobStatus {}", execution.getJobId(), execution.getStatus().getBatchStatus()); 
      return String.valueOf(execution.getJobId()); 
     } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException 
       | JobParametersInvalidException e) { 
      LOG.info("Job execution failed, {}", e); 
     } 
     return "Some Error"; 
    } 
} 

您可以通过修改JobLauncher来使作业异步。

@Bean 
    public JobLauncher simpleJobLauncher(JobRepository jobRepository){ 
     SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); 
     jobLauncher.setJobRepository(jobRepository); 
     jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); 
     return jobLauncher; 
    } 

请参考documentation更多信息

+0

谢谢! 'SimpleAsyncTaskExecutor'是线索。我定义了一个自定义JobLauncher,它删除了Spring Batch Camel组件添加的所有作业参数。我需要将'TaskExecutor'设置为'SimpleAsyncTaskExecutor'。它默认为'SyncTaskExecutor'作为我自定义的JobLauncher扩展'SimpleJobLauncher'。 –

相关问题