Entity view (Content)

Mule Batch Job

By bfattouh
Jan. 12, 2015

Introduction

In this post I will show a sample demo application that illustrates mule batch job working and techniques to handle exceptions and records failures. The demo application code can be cloned here Ref[1].

Use case

The application illustrates a use case of a batch job that queries a database to get all users with a status "Approved" and if any exist gets each user account record and generates a CSV file using the account attributes. The resulting CSV file will be saved for later use or it may be sent via email or sent to an FTP server.

Based on this use case, the demo application shows also how to handle some special cases that throw exceptions and lead to record failures. As an example, how to handle an approved user with no existing account? how to keep track of these failed records? and how to generate a report for approved users with no created account or send them with the exceptions to a JMS queue.

The post found here Ref[2] has a very cool explanation of exceptions and errors handling within mule batch jobs.

Description

The demo application uses the following configuration that means the batch job will continue running all the current loaded records no matter how many records have been failed.

<batch:job name="users-accounts-batch-job" max-failed-records="-1">

In other scenarios, we may need to stop the running batch job when we reach 20 failed records by setting the attribute max-failed-records=20

Input phae:

In this  phase we call a flow that loads all approved user:

<batch:input>
<flow-ref name="get-users-records" doc:name="Flow Reference"/>
</batch:input>

The reference to the flow get-users-records:

<flow name="get-users-records" doc:name="get-users-records" processingStrategy="synchronous">
 <logger message="Start getting users records -connecting to database using URL:${database.url}"
                  level="INFO" doc:name="Logger"/>
 <db:select config-ref="MySqlDatabase" doc:name="get approved users"/>
      <db:parameterized-query><![CDATA[SELECT * FROM usermodel.Users WHERE status=10;]]>
      </db:parameterized-query>
 <
/db:select></flow>
 <logger message="End getting users records #[payload]" level="INFO" doc:name="Logger"/>
</flow>

The process records stage:

In this phase we have two steps; the first one is : get-user-account-step that gets a user account record, and the second one is: failures-step that processes failed records. As an example, if the the current user does not have an account, then NoUserAccountExistException exception is thrown, this exception is handled by sending the curent user with the exception message to a JMS queue for later check.

In case the first step returns an existing account information, the failures step is skipped as only failures are captured by using the config: accept-policy="ONLY_FAILURES". The returned account info may be used to generate a CSV file that is required by business operations.

The following depicts the batch step: get-user-account-step; here we hold the current user id in a record variable using the expression: #[recordVars['currentUser']] in the enricher:

<batch:step name="get-user-account-step">
   <logger message="Start processing step: get-user-account-step"
           level="INFO"/>
   <enricher source="#[payload['id']]" target="#[recordVars['currentUser']]">
      <set-payload value="#[payload]" doc:name="Set Payload"/>
   </enricher>
   <flow-ref name="get-account-record" doc:name="Flow Reference"/>
   <logger message="Account record payload: #[payload]" level="INFO" doc:name="Logger"/>
   <!-- We may transform the record payload here and push it into a CSV file -->
</batch:step>

The reference to the flow that gets the current user account called get-account-record is depicted in the following flow that shows the use of the record variable: currentUser to get the corresponding account.

In this flow we also use a component just after the query that returns the account info, this component checks if the returned account info is empty, then it throws the exception: NoUserAccountExistException.

<flow name="get-account-record" doc:name="get-account-record" processingStrategy="synchronous">
 <logger message="Start getting account record for user:
               #[recordVars['currentUser']]" level="INFO"/>
 <db:select config-ref="MySqlDatabase" doc:name="get user account account">
    <db:parameterized-query>
       <![CDATA[SELECT * FROM usermodel.Accounts WHERE user_id=#[recordVars['currentUser']];]]>
    </db:parameterized-query>
 </db:select>
 <component class="com.appnov.batch.AccountVerifier" doc:name="Java"/>
 <logger message="End getting account user: #[recordVars['currentUser']]" level="INFO"/>
</flow> 

The following depicts the batch step: failures-step; that obviously accepts only failed records:

<batch:step name="step-failures" accept-policy="ONLY_FAILURES">
   <logger message="Failed record with user id:  #[recordVars['currentUser']]" level="INFO"/>
   <set-payload value="#[getStepExceptions()]" doc:name="Set Payload"/>
   <foreach collection="#[payload.values()]" doc:name="For Each">
    <logger message="Current user: #[recordVars['currentUser']] record has been failed,Exception:
                      #[payload]" level="INFO"/>
     <!-- We may send the Payload here to a JMS queue or use it to create a report file-->
   </foreach>
</batch:step>

I these two steps we have illustrated how to process records and handle failures in a batch job. another special case I have noticed that worths talking about, for instance in case during the input phase no database connection could be established because of the wrong database url  the following exception is caught by the default exception strategy as depicted within the following: 

INFO 2014-12-12 11:04:24,212[[batch-job-demo].start-batch-job.stage1.02]
     com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase
INFO 2014-12-12 11:04:24,222[[batch-job-demo].start-batch-job.stage1.02]
     org.mule.api.processor.LoggerMessageProcessor:
Start getting users records - connecting to database using URL:
ERROR 2014-12-12 11:04:24,263 [[batch-job-demo].start-batch-job.stage1.02]
     org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message   : null (java.lang.NullPointerException).
Message payload is of type: String
Code      : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
org.mule.module.db.internal.domain.connection.DefaultDbConnection:99 (null)
-------------------------------------------------------------------------------- 

In this case, the batch process will continue to the end that is the on complete phase, this is a very important if we need to generate a report at the end of the batch process even with 0 records processed etc... and the exception that occurred.  The following is the output result within the on complete phase picked from the logging:

INFO  2014-12-12 11:04:24,287 [[batch-job-demo].start-batch-job.stage1.02]
           com.mulesoft.module.batch.engine.DefaultBatchEngine:
Starting execution of onComplete phase for instance 09b38430-8474-11e4-9c5c-0a0027000000
           of job users-accounts-batch-job
INFO  2014-12-12 11:04:24,371 [[batch-job-demo].start-batch-job.stage1.02]
           org.mule.api.processor.LoggerMessageProcessor:
on-complete payload: BatchJobInstanceId:09b38430-8474-11e4-9c5c-0a0027000000
          Number of TotalRecords: 0
          ProcessedRecords: 0
          Number of sucessfull Records: 0
          Number of failed Records: 0
          ElapsedTime in milliseconds: 0
          InpuPhaseException com.mulesoft.module.batch.exception.BatchException:
                 null (java.lang.NullPointerException). Message payload is of type:
                 String (org.mule.api.MessagingException)
          LoadingPhaseException: null
          CompletePhaseException: null

Here in this phase, it appears clearly that 0 records have been processed, and this happened because of the database connection exception that occurred during the input phase as it is shown by the InputPhaseException. This kind of exception handling is useful if the requirements state to have a report at the end of the batch job process indicating the number of records processed along with the failed and successful ones.

To understand the anatomy of a batch, check this useful mule batch processing documentation found here Ref[3].

References

Ref[1] https://github.com/fattouh/batch-job-demo

Ref[2] http://blogs.mulesoft.org/handle-errors-batch-job

Ref[3] http://www.mulesoft.org/documentation/display/current/Batch+Processing

Post Tags: