2015-04-06 138 views
0

我正在使用spring批处理模块处理目录中的csv文件。目录可能包含具有特定扩展名的多个文件,我正在使用MultiResourceItemReader来读取这些文件。 作业将接收3个作业参数,分别为read_from_directory,move_to_directory和default_user_id。所有这些参数对于所有作业运行将保持相同。 read_from_directory将包含 多个csv文件,作业应该一个接一个地处理这些文件。我面临的问题是因为作业参数是相同的,我得到JobInstanceAlreadyCompleteException当第二次运行作业。我知道这个问题可以通过使用额外的时间戳参数来使作业参数唯一来解决。但是由于添加时间戳参数会使每个作业实例都是唯一的,我不希望使用这种方法,因为它会在使作业可重新启动时产生问题。 所以我想一些建议,spring批处理 - multiResourceItemReader:如何使作业参数唯一并重新启动作业

  1. 我怎样才能使各项工作实例是唯一不使用时间戳参数。

  2. 在这种情况下如何使作业可重新启动?加入'restartable =“true”'就足够了,还是需要一些额外的配置/编码。我有点困惑,因为作业会从目录中读取多个文件。所以,如果一项工作失败,例如,由于在一个文件中记录不正确,我怎么能从它离开的地方重新开始相同的工作?我已经将作业配置为在一段时间之后使用调度程序定期运行。因此,如果工作失败,然后我纠正csv文件中的错误,工作将从下次运行时停止的地方开始?

请从我的配置下面相关部分找到:

<batch:job id="testJob" restartable="true"> 
     <batch:step id="step1"> 
      <batch:tasklet> 
       <batch:chunk reader="multiResourceItemReader" writer="fileWriter" 
        commit-interval="1"> 
       </batch:chunk> 
      </batch:tasklet> 
     </batch:step> 
    </batch:job> 

    <bean id="fileWriter" class="com.ivish.TestFileWriter" /> 
    <bean id="multiResourceItemReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step"> 
     <property name="resources" value="file:#{jobParameters['read_from_directory']}/*.csv" /> 
     <property name="delegate" ref="fileReader" /> 
    </bean> 

    <bean id="fileReader" class="com.ivish.TestFileReader" scope="step"> 
     <property name="delegate" ref="delegateFileReader" /> 
     <property name="moveToDirectory" value="#{jobParameters['move_to_directory']}" /> 
    </bean> 
    <bean id="delegateFileReader" class="org.springframework.batch.item.file.FlatFileItemReader"> 
     <property name="lineMapper"> 
      <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> 
       <property name="lineTokenizer" ref="fileTokenizer" /> 
       <property name="fieldSetMapper"> 
        <bean 
         class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" /> 
       </property> 
      </bean> 
     </property> 
    </bean>   

谢谢。

回答

0

问题1:你可以用你自己的逻辑实现JobParameterIncrementer下一个实例是什么,以及你想如何增加参数。然后,根据您的逻辑,您可以运行下一个作业实例,以便运行下一个实例,否则,只需运行即可重新启动最新的实例。如果您开始使用CommandLineJobRunner,则可以通过-next运行下一个实例,如果以编程方式执行此操作,则可以使用JobOperator#startNextInstance(String jobName)JobParameterIncrementer这里是example

问题2:对于重新启动加入restartable="true"应该做的伎俩。 FlatFileItemReader这是代理读取文件扩展AbstractItemCountingItemStreamItemReader它保存读取文件时的状态。至于MultiResourceItemReader你可以documentation看到,它说:

输入资源使用setComparator(比较),以确保资源的排序工作之间保持有序运行在重启方案。

所以这意味着资源列表是有序的,并且每个都被委托给FlatFileItemReader,它保留了运行之间的顺序和计数。

1

Spring Batch有两个截然不同的概念,涉及作业“运行”,JobInstanceJobExecution

JobInstance是逻辑运行的概念。它由一套独特的识别工作参数来识别。在你的例子中,对于read_from_directory,move_to_directory和default_user_id的每个组合,我希望有一个JobInstance

其他的概念是JobExecution。这代表了一次物理运行。例如,如果运行read_from_directory,move_to_directory和default_user_id的组合,并且它通过,则JobInstance将有一个孩子JobExecution。但是,如果第一次尝试(第一次JobExecution)失败,则可以重新启动该作业。重新启动将在现有的JobInstance(一次逻辑运行下的两次物理运行)下创建新的JobExecution

考虑到上述情况,每个JobInstance将通过read_from_directory,move_to_directory,default_user_id的组合独特的,某种(Spring Batch的的运行ID提供基于一个计数器开箱即用,也可以使用时间戳)。

您可以将文档在这里阅读更多关于的JobInstanceJobExecution的概念:http://docs.spring.io/spring-batch/trunk/reference/html/domain.html#domainJob

相关问题