2016-03-05 286 views
1

我有一个Spring Batch作业,它通过SFTP从远程Linux服务器检索文件。 远程服务器上的目录是一个价值7天的文件(〜400个文件)的档案。这些文件的大小相对较小。Spring批处理和JobInstanceAlreadyCompleteException:

Spring Batch知道哪些文件已经被处理。

当我启动应用程序。第一次,Spring Batch任务小程序检索文件,Spring Batch为其已经处理的每个文件生成一个例外:

例如,

引起:org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException:作业实例已经存在,并且已经完成了参数= {input.file.url = file:///blahblahblah.txt}。

这会导致处理过程中的巨大延迟。

第一次后,在文件的后续sftp检索没有例外,因此没有延迟(也许春季批生成已经处理的文件的内部哈希列表)。

  1. 在变压器类,我应该检查该文件本地存在,只调用JobLaunchRequest()■对尚未处理的新文件?

    /**

    • 变换BAI文件到Spring Batch的工作起步请求 */ 公共类FileMessageToJobRequestTransformer { 公共静态最终记录仪记录仪= LoggerFactory.getLogger(FileMessageToJobRequestTransformer.class); 私人工作职位;

      private String fileParameterName;

      public void setJob(作业作业){ this.job =作业; (文件参数名称:{}“,fileParameterName);}}}}}} this.fileParameterName = fileParameterName; }

      @Transformer 公共JobLaunchRequest变换(消息信息){ LOGGER.debug( “文件信息:{}”,消息); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();

      jobParametersBuilder.addString(fileParameterName, 
           "file://" + message.getPayload().getAbsolutePath()); 
      
      LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters()); 
      
      return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); 
      

      }}

  2. 有没有办法,我可以捕获该异常?

JobInstanceAlreadyCompleteException

  • 我应该设置重试极限= “1” 和=以SftpBaiParserJobBridge-context.hml “1” 跳限制?
  • <!-- When getting a new BAI file, transform into spring batch job request --> <int:transformer id="fileMessageToJobRequestTransformer" input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel" method="transform"> <bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer"> <property name="job" ref="baiParseJob"/> <property name="fileParameterName" value="input.file.url"/> </bean> <int:poller fixed-rate="10000"/> </int:transformer>

    追加ADDDATE()已经允许要处理的文件多次。 数据库中现在有了一些蠢话。

    jobParametersBuilder.addString(fileParameterName, 
           "file://" + message.getPayload().getAbsolutePath())).addDate("rundate", new Date()).toJobParameters(); 
    

    enter image description here 谢谢!

  • 我可以检查确定由经由来自内部的JobExplorer接口查询储存库“@Transformer 公共JobLaunchRequest变换(消息信息)”和返回空如果该文件是否已被处理该文件已被处理?
  • 例如,

    public interface JobExplorer { 
    
        List<JobInstance> getJobInstances(String jobName, int start, int count); 
    
        JobExecution getJobExecution(Long executionId); 
    
        StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId); 
    
        JobInstance getJobInstance(Long instanceId); 
    
        List<JobExecution> getJobExecutions(JobInstance jobInstance); 
    
        Set<JobExecution> findRunningJobExecutions(String jobName); 
    } 
    
  • 如何捕获异常?

    原因:org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException:作业实例已经存在,并且已完成参数= {input.file.url = file:/// home/dlaxer/dfc-bank-集成/ MBI的应用程序/白/下载/ BAI_Intraday160302070054471.txt}。如果您想再次运行此作业,请更改参数。 在org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) 在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java :62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection (AopUtils.java:302) 在org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 在org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 在org.springframework.transaction.interceptor.TransactionInterceptor $ 1.proceedWithInvocation(TransactionInterceptor.java:99) 在org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281) 在org.springframework.transaction.interceptor .TransactionInterceptor.invoke(TransactionInterceptor.java:96) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean $ 1.invoke( AbstractJobRepositoryFactoryBean.java:172) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkD ynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy。$ Proxy113.createJobExecution(Unknown Source) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) at sun.reflect。NativeMethodAccessorImpl.invoke0(本机方法) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke (Method.java:498) 在org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) 在org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 在有机.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration $ PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) 在org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 在org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 在com.sun.proxy。$ Proxy114。运行(来源不明) 在org.springframework.batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50) 在org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76) ...更多

  • 回答

    1

    我看到你只是在作业启动器中传递一个参数作为“文件名”。 作业启动器启动后,它将从存储库BATCH_JOB_EXECUTION表中查询并检查上次处理作业的状态。 在您当前的作业中,执行输入参数与先前执行的作业和批处理状态相同&退出代码=已完成,那么您将获得JobInstanceAlreadyCompleteException。你应该尝试在每次执行时总是传递唯一的Parmae​​ters。仅仅通过现有的时间为参数,并尝试

    JobParameters jobparam = new JobParametersBuilder().addString(fileParameterName, "file://" + message.getPayload().getAbsolutePath()) 
             .addDate("rundate", new Date()).toJobParameters(); 
    
    JobExecution execution = jobLauncher.run(job, jobparam); 
         catch (Exception e) { 
          if (e instanceof JobInstanceAlreadyCompleteException){ 
           System.out.println("Raj*************"); 
          } 
    
    • 2是可以处理这样

      赶上(例外五){ 如果(E的instanceof JobInstanceAlreadyCompleteException){ 的System.out。 println(“需要处理*************”); }

    +0

    好的。我不想再多处理一个文件 - 我只是不希望它花费3-4小时来确定哪些文件已经被所有的JobInstanceAlreadyCompleteException异常处理。不会传入当前时间作为参数允许多次处理一个文件? – dbl001

    +0

    另外,为什么处理300个异常需要3个多小时? – dbl001

    +0

    @ dbl001允许一个文件被多次处理 - 再一次取决于之前作业执行的状态,如果它失败了,肯定会允许你处理相同的文件,或者如果成功完成,你会得到异常行为。是的,它不会允许你处理相同的文件。我无法得到您的问题“3个小时以上处理300个例外”。 –

    1

    3 - 我应该设置retry-limit =“1”和skip-limit =“1”。
    回答取决于您的要求。哪个例外,你想忽略它,次数意味着=跳过限制,并在查克过程中想重复同样的步骤,特别是异常意味着重试。 能用例子更好地理解。如果我想读取一些平面文件,并有可能文件可能有不良输入记录,可能会导致flatFileException 我想忽略它,并顺利处理我的文件,然后我的配置将如下所示。 我可以在作业执行期间跳过最多20条记录,如果批处理收到异常,那么它应该尝试至少2次。

    <chunk reader="flatFileItemReader" writer="itemWriter" 
          commit-interval="1" skip-limit="20" retry-limit="2"> 
         <skippable-exception-classes> 
          <include class="org.springframework.batch.item.file.FlatFileParseException"/> 
         </skippable-exception-classes>