我有一个Spring Batch作业,它通过SFTP从远程Linux服务器检索文件。 远程服务器上的目录是一个价值7天的文件(〜400个文件)的档案。这些文件的大小相对较小。Spring批处理和JobInstanceAlreadyCompleteException:
Spring Batch知道哪些文件已经被处理。
当我启动应用程序。第一次,Spring Batch任务小程序检索文件,Spring Batch为其已经处理的每个文件生成一个例外:
例如,
引起:org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException:作业实例已经存在,并且已经完成了参数= {input.file.url = file:///blahblahblah.txt}。
这会导致处理过程中的巨大延迟。
第一次后,在文件的后续sftp检索没有例外,因此没有延迟(也许春季批生成已经处理的文件的内部哈希列表)。
在变压器类,我应该检查该文件本地存在,只调用JobLaunchRequest()■对尚未处理的新文件?
/**
变换BAI文件到Spring Batch的工作起步请求 */ 公共类FileMessageToJobRequestTransformer { 公共静态最终记录仪记录仪= LoggerFactory.getLogger(FileMessageToJobRequestTransformer.class); 私人工作职位;
private String fileParameterName;
public void setJob(作业作业){ this.job =作业; (文件参数名称:{}“,fileParameterName);}}}}}} this.fileParameterName = fileParameterName; }
@Transformer 公共JobLaunchRequest变换(消息信息){ LOGGER.debug( “文件信息:{}”,消息); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, "file://" + message.getPayload().getAbsolutePath()); LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters()); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}}
有没有办法,我可以捕获该异常?
JobInstanceAlreadyCompleteException
- 我应该设置重试极限= “1” 和=以SftpBaiParserJobBridge-context.hml “1” 跳限制?
- 我可以检查确定由经由来自内部的JobExplorer接口查询储存库“@Transformer 公共JobLaunchRequest变换(消息信息)”和返回空如果该文件是否已被处理该文件已被处理?
如何捕获异常?
原因:org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException:作业实例已经存在,并且已完成参数= {input.file.url = file:/// home/dlaxer/dfc-bank-集成/ MBI的应用程序/白/下载/ BAI_Intraday160302070054471.txt}。如果您想再次运行此作业,请更改参数。 在org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) 在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java :62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection (AopUtils.java:302) 在org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 在org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 在org.springframework.transaction.interceptor.TransactionInterceptor $ 1.proceedWithInvocation(TransactionInterceptor.java:99) 在org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281) 在org.springframework.transaction.interceptor .TransactionInterceptor.invoke(TransactionInterceptor.java:96) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean $ 1.invoke( AbstractJobRepositoryFactoryBean.java:172) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkD ynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy。$ Proxy113.createJobExecution(Unknown Source) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) at sun.reflect。NativeMethodAccessorImpl.invoke0(本机方法) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke (Method.java:498) 在org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) 在org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 在有机.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration $ PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) 在org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 在org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 在com.sun.proxy。$ Proxy114。运行(来源不明) 在org.springframework.batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50) 在org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76) ...更多
<!-- When getting a new BAI file, transform into spring batch job request --> <int:transformer id="fileMessageToJobRequestTransformer" input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel" method="transform"> <bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer"> <property name="job" ref="baiParseJob"/> <property name="fileParameterName" value="input.file.url"/> </bean> <int:poller fixed-rate="10000"/> </int:transformer>
追加ADDDATE()已经允许要处理的文件多次。 数据库中现在有了一些蠢话。
jobParametersBuilder.addString(fileParameterName,
"file://" + message.getPayload().getAbsolutePath())).addDate("rundate", new Date()).toJobParameters();
例如,
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
好的。我不想再多处理一个文件 - 我只是不希望它花费3-4小时来确定哪些文件已经被所有的JobInstanceAlreadyCompleteException异常处理。不会传入当前时间作为参数允许多次处理一个文件? – dbl001
另外,为什么处理300个异常需要3个多小时? – dbl001
@ dbl001允许一个文件被多次处理 - 再一次取决于之前作业执行的状态,如果它失败了,肯定会允许你处理相同的文件,或者如果成功完成,你会得到异常行为。是的,它不会允许你处理相同的文件。我无法得到您的问题“3个小时以上处理300个例外”。 –