2017-10-17 131 views
0

我有订阅该主题的JMS入站端点。一旦有消息转换器将有效负载拆分成记录列表,然后使用批量提交将其插入数据库中。如果在插入数据库时​​出现任何错误,我想将整个有效负载回滚到JMS。如何使用交易来实现这一点?mule:批处理中的事务

<batch:job name="lockboxBatch" max-failed-records="-1"> 
     <batch:input> 
      <jms:inbound-endpoint topic="lockbox" connector-ref="Active_MQ1" doc:name="JMS"> 
      </jms:inbound-endpoint> 
      <custom-transformer class="transformers.PaymentsTransformer" doc:name="Java"/> 
      <logger level="INFO" doc:name="Logger"/> 
     </batch:input> 
     <batch:process-records> 
      <batch:step name="Batch_Step"> 
       <expression-component doc:name="Expression"><![CDATA[payload[2].batchAmount='hghghfghfhgf']]></expression-component> 
       <batch:commit size="4" doc:name="Batch Commit"> 
        <db:insert config-ref="Oracle_Configuration" doc:name="Database" bulkMode="true" > 
         <db:parameterized-query><![CDATA[INSERT INTO TIB_INT_AR_PAYMENT_IFACE (TRANSMISSION_REQUEST_ID,DESTINATION_ACCOUNT,ORIGINATION,TRANSMISSION_RECORD_COUNT,TRANSMISSION_AMOUNT,LOCKBOX_NUMBER,LOCKBOX_BATCH_COUNT,LOCKBOX_RECORD_COUNT,LOCKBOX_AMOUNT,BATCH_NAME,BATCH_AMOUNT,BATCH_RECORD_COUNT,ITEM_NUMBER,CURRENCY_CODE,REMITTANCE_AMOUNT,TRANSIT_ROUTING_NUMBER,ACCOUNT,CHECK_NUMBER,CUSTOMER_NUMBER,OVERFLOW_INDICATOR,OVERFLOW_SEQUENCE,INVOICE1,AMOUNT_APPLIED1) VALUES (#[payload.?transmissiosnRequestID],#[payload.?destinastionAccount],#[payload.?origination],#[payload.?transmissionSrecordCount],#[payload.?transmisssionAmount],#[payload.lockboxNumber],#[payload.lockboxBatchCount],#[payload.lockboxRecordCount],#[payload.lockboxAmount],#[payload.batchName],#[payload.batchAmount],#[payload.batchRecordCount],#[payload.itemNumber],#[payload.currencyCode],#[payload.remittanceAmount],#[payload.transitRoutingNumber],#[payload.account],#[payload.checkNumber],#[payload.customerNumber],#[payload.overflowIndicator],#[payload.overflowSequence],#[payload.invoice1],#[payload.amountApplied1])]]></db:parameterized-query> 
        </db:insert> 
       </batch:commit> 
      </batch:step> 
      <batch:step name="Batch_Step1" accept-policy="ONLY_FAILURES"> 
      <set-payload value="#[getStepExceptions()]" doc:name="Set Payload"/> 
       <foreach collection="#[payload.values()]" doc:name="For Each"> 
        <jms:outbound-endpoint queue="Invalid_Transmission" connector-ref="Active_MQ" doc:name="JMS"/> 
       </foreach> 
      </batch:step> 


     </batch:process-records> 
     <batch:on-complete> 
      <logger message="Completed the insert" level="INFO" doc:name="Logger"/> 
     </batch:on-complete> 
    </batch:job> 

回答

0

你可以尝试两件事。

1.Make max-failed-records =“0”,这将在批处理步骤失败的情况下回滚。

2.将DB连接器映射到事务范围内,并根据需要处理异常情况。

<transactional action="ALWAYS_BEGIN" doc:name="Transactional"> 
    <db:insert...>........</db:insert> 
</transactional> 

请考虑下面更新的代码,您可以根据您的要求进行更改。

<batch:job name="lockboxBatch" max-failed-records="0"> 
    <batch:input> 
     <jms:inbound-endpoint topic="lockbox" connector-ref="Active_MQ1" doc:name="JMS"> 
     </jms:inbound-endpoint> 
     <custom-transformer class="transformers.PaymentsTransformer" doc:name="Java"/> 
     <logger level="INFO" doc:name="Logger"/> 
    </batch:input> 
    <batch:process-records> 
     <batch:step name="Batch_Step"> 
      <expression-component doc:name="Expression"><![CDATA[payload[2].batchAmount='hghghfghfhgf']]></expression-component> 
      <batch:commit size="4" doc:name="Batch Commit"> 
        <transactional action="ALWAYS_BEGIN" doc:name="Transactional"> 
         <db:insert config-ref="Oracle_Configuration" bulkMode="true" doc:name="Database"> 
          <db:parameterized-query><![CDATA[INSERT INTO TIB_INT_AR_PAYMENT_IFACE (TRANSMISSION_REQUEST_ID,DESTINATION_ACCOUNT,ORIGINATION,TRANSMISSION_RECORD_COUNT,TRANSMISSION_AMOUNT,LOCKBOX_NUMBER,LOCKBOX_BATCH_COUNT,LOCKBOX_RECORD_COUNT,LOCKBOX_AMOUNT,BATCH_NAME,BATCH_AMOUNT,BATCH_RECORD_COUNT,ITEM_NUMBER,CURRENCY_CODE,REMITTANCE_AMOUNT,TRANSIT_ROUTING_NUMBER,ACCOUNT,CHECK_NUMBER,CUSTOMER_NUMBER,OVERFLOW_INDICATOR,OVERFLOW_SEQUENCE,INVOICE1,AMOUNT_APPLIED1) VALUES (#[payload.?transmissiosnRequestID],#[payload.?destinastionAccount],#[payload.?origination],#[payload.?transmissionSrecordCount],#[payload.?transmisssionAmount],#[payload.lockboxNumber],#[payload.lockboxBatchCount],#[payload.lockboxRecordCount],#[payload.lockboxAmount],#[payload.batchName],#[payload.batchAmount],#[payload.batchRecordCount],#[payload.itemNumber],#[payload.currencyCode],#[payload.remittanceAmount],#[payload.transitRoutingNumber],#[payload.account],#[payload.checkNumber],#[payload.customerNumber],#[payload.overflowIndicator],#[payload.overflowSequence],#[payload.invoice1],#[payload.amountApplied1])]]></db:parameterized-query> 
         </db:insert> 
        </transactional> 

      </batch:commit> 
     </batch:step> 
     <batch:step name="Batch_Step1" accept-policy="ONLY_FAILURES"> 
     <set-payload value="#[getStepExceptions()]" doc:name="Set Payload"/> 
      <foreach collection="#[payload.values()]" doc:name="For Each"> 
       <jms:outbound-endpoint queue="Invalid_Transmission" connector-ref="Active_MQ" doc:name="JMS"/> 
      </foreach> 
     </batch:step> 


    </batch:process-records> 
    <batch:on-complete> 
     <logger message="Completed the insert" level="INFO" doc:name="Logger"/> 
    </batch:on-complete> 
</batch:job> 
+0

感谢您的回复。如果我将db连接器放入事务范围内,并且如果发生故障,它会尝试回滚并重试(再次将其发送到入站JMS),或者它将仅回滚并将其作为失败记录发送到下一步。 – MRavindran