2013-05-02 65 views
1

在我们开发的Spring批处理作业中,我们使用AsyncTaskExecutor来允许在步骤级别进行并行处理。我们将并发限制设置为5.我们使用JDBCCursorItemReader来获取我们需要处理的输入记录。 JDBCCursorItemReader被配置为获取连接池的连接。在执行作业时,我们注意到由于连接池中的连接耗尽导致作业中止,导致我们认为JDBCCursorItemReader没有将连接释放回连接池。任何建议?使用AsyncTaskExecutor时项目读取器中的数据库连接泄漏

下面是我的批处理XML

<?xml version="1.0" encoding="UTF-8"?> 
<beans:beans xmlns="http://www.springframework.org/schema/batch" 
    xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:util="http://www.springframework.org/schema/util" 
    xsi:schemaLocation=" 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
     http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd 
     http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> 

    <beans:import resource="../launch-context.xml" /> 

    <beans:bean id="wsStudentItemReader" 
     class="org.springframework.batch.item.database.JdbcCursorItemReader"  
     scope="step"> 
     <beans:property name="dataSource" ref="rptDS" /> 
     <beans:property name="sql" 
      value="SELECT * FROM STUDENTS WHERE BATCH_ID=?" /> 
     <beans:property name="preparedStatementSetter"> 
      <beans:bean class="com.test.BatchDtSetter" 
       autowire="byName"> 
       <beans:property name="batchId" value="#{jobParameters[batchId]}" /> 
      </beans:bean> 
     </beans:property> 
     <beans:property name="rowMapper" ref="wsRowMapper" /> 
    </beans:bean> 


<beans:bean id="outputWriter" 
     class="org.springframework.batch.item.support.ClassifierCompositeItemWriter"> 
     <beans:property name="classifier" ref="writerClassifier" >  
     </beans:property>  
</beans:bean> 

<beans:bean id="writerClassifier" 
    class="com.test.WriterClassifier"> 
    <beans:property name="codeFailWriter" ref="failJdbcBatchItemWriter" /> 
    <beans:property name="codePassWriter" ref="passJdbcBatchItemWriter"></beans:property> 
</beans:bean> 

    <beans:bean id="failJdbcBatchItemWriter" 
     class="org.springframework.batch.item.database.JdbcBatchItemWriter"> 
     <beans:property name="dataSource" ref="rptDS" /> 
     <beans:property name="sql" 
      value="DELETE FROM STUDENTS WHERE BATCH_ID=?" /> 
     <beans:property name="itemPreparedStatementSetter" ref="FailStatusSetter" /> 
    </beans:bean> 

<beans:bean id="FailStatusSetter" class="com.test.FailStatusSetter" /> 

    <beans:bean id="passJdbcBatchItemWriter" 
     class="com.test.PassBatchItemWriter"> 
    </beans:bean> 

    <beans:bean id="WSListnr" 
     class="com.test.WSBatchListnr"> 
     <beans:property name="dataSource" ref="rptDS" /> 
    </beans:bean> 


    <beans:bean id="wsRowMapper" class="com.test.WSReqMapper" /> 
    <beans:bean id="wsReqPrcsr" 
     class="com.test.WSReqProc"> 
     <beans:property name="dataSource" ref="rptDS" /> 
    </beans:bean> 
    <beans:bean id="wsReqPrepStmtSetter" class="com.test.wsStudentSetter" /> 

    <step id="initiateStep"> 
     <tasklet ref="initiateStepTask" /> 
    </step> 
    <step id="wsStudentGenStep"> 
     <tasklet task-executor="taskExecutor"> 
      <chunk reader="wsStudentItemReader" processor="wsReqPrcsr" 
       writer="outputWriter" commit-interval="4" skip-limit="20"> 
       <skippable-exception-classes> 
        <include class="java.lang.Exception" /> 
       </skippable-exception-classes> 
      </chunk> 
      <listeners> 
       <listener ref="WSListnr" /> 
      </listeners>    
     </tasklet> 
    </step> 
    <job id="wsStudent"> 
     <step id="wsStudentFileGenIntialStep" parent="initiateStep" 
      next="wsStudentFileGenStep" /> 
     <step id="wsStudentFileGenStep" parent="wsStudentGenStep" /> 
    </job> 


    <beans:bean id="initiateStepTask" class="com.test.Initializer" 
     scope="step"> 
    </beans:bean> 

    <beans:bean id="taskExecutor" 
     class="org.springframework.core.task.SimpleAsyncTaskExecutor"> 
     <beans:property name="concurrencyLimit" value="2"/> 
    </beans:bean> 

</beans:beans> 

的Config.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans:beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:batch="http://www.springframework.org/schema/batch" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:beans="http://www.springframework.org/schema/beans" 
    xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache" 
    xsi:schemaLocation=" 
     http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd 
     http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd 
     http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"> 

    <context:component-scan base-package="com.test" /> 
    <context:property-placeholder location="classpath:batch.properties" /> 

     <beans:bean id="websiteDS" 
    class="org.springframework.jndi.JndiObjectFactoryBean"> 
    <beans:property name="jndiName" value="java:jboss/datasources/WebSiteDS" /> 
    <beans:property name="lookupOnStartup" value="false" /> 
    <beans:property name="cache" value="true" /> 
    <beans:property name="proxyInterface" value="javax.sql.DataSource" /> 
</beans:bean> 

<beans:bean id="rptDS" 
    class="org.springframework.jndi.JndiObjectFactoryBean"> 
    <beans:property name="jndiName" value="java:jboss/datasources/ReportingDS" /> 
    <beans:property name="lookupOnStartup" value="false" /> 
    <beans:property name="cache" value="true" /> 
    <beans:property name="proxyInterface" value="javax.sql.DataSource" /> 
</beans:bean> 


    <beans:bean id="transactionManager" 
     class="org.springframework.jdbc.datasource.DataSourceTransactionManager" 
     lazy-init="true"> 
     <beans:property name="dataSource" ref="rptDS" /> 
    </beans:bean> 


    <beans:bean id="jobRepository" 
     class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" 
     p:databaseType="db2" p:transactionManager-ref="transactionManager" 
     p:dataSource-ref="batchDS" p:isolation-level-for-create="ISOLATION_REPEATABLE_READ" /> 


    <beans:bean id="jobLauncher" 
     class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> 
     <beans:property name="jobRepository" ref="jobRepository" /> 
    </beans:bean> 

    <beans:bean id="jobOperator" 
     class="org.springframework.batch.core.launch.support.SimpleJobOperator" 
     p:jobLauncher-ref="jobLauncher" p:jobExplorer-ref="jobExplorer" 
     p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry" /> 

    <beans:bean id="jobExplorer" 
     class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" 
     p:dataSource-ref="batchDS" /> 

    <beans:bean id="jobRegistry" 
     class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> 
    <beans:bean 
     class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> 
     <beans:property name="jobRegistry" ref="jobRegistry" /> 
    </beans:bean> 


    <beans:bean id="completionPolicy" class="org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy"/> 


</beans:beans> 

也可参考,SimpleAsyncTaskExecutor trying to read record after completion of processing,因为这两个问题可能与

+0

“什么建议吗?” - 检查JDBCCursorItemReader是否释放连接?你能给我们一些代码吗? – Fildor 2013-05-02 13:01:40

+0

请发布您的配置文件。 – Cygnusx1 2013-05-02 13:20:13

+0

@ Cygnusx1:添加配置 – 2013-05-06 10:11:17

回答