Entity view (Content)

Polling and Watermarking in Mule

By rcarter
Jan. 28, 2015

When it comes to synchronizing data between many systems, polling an API or resource is an unfortunate inevitability. This results in developers calling these APIs over and over again to get updates, only to find out nothing has changed. This process constantly uses up resources and is not acceptable to either the API consumer or the API provider. In order to most efficiently poll an API, you need to keep track of where you last left off so we dont process the same data over and over again.

The term 'watermarking' is borrowed from floods, whereby you measure the watermarks on a surface to see how high the water rose. This resonates with data synchronization when you need to measure how much of a particular dataset you have already processed. Watermarking allows us to pick up from where we last left off without having to reprocess and filter out the old data that we do not care about anymore.

It sounds reasonably straightforward to pick up from where we last left off; however, we must generate a marker such as a timestamp or the last ID processed, persist the marker between polls, update the marker after each poll, and handle any errors to ensure no data is missed. This logic is quite complicated and verbose to take care of manually. Luckily, MuleSoft development provides a special watermark processor that will automatically handle all of this for us.

Let's look at an example using Twitter.

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
	http://www.mulesoft.org/schema/mule/core/current/mule.xsd ">
	<twitter:config name="twitter" consumerKey="${twitter.consumer.key}"
		accessSecret="${twitter.access.secret}" />
	<flow name="twitterWatermarking"& processingStrategy="synchronous">
		<poll frequency="30000">
			<watermark variable="lastId" default-expression="#[0]"
				update-expression="#[payload.sinceId]" />
			<twitter:search query="mule" sinceId="#[flowVars["lastId"]]" />
		<logger message="#[payload]" level="INFO" />

If you look into the response from the Twitter search operation, you will notice that each Tweet contains a 'sinceId' attribute. The Twitter connector allows us to set this on the search operation to only return results with an Id greater than the specified Id. We will use this as our marker so that new polls pick up only from the last Id we processed.

The 'variable' attribute defines the name of the variable that will be used to store our marker. In this case, we will call the variable 'lastId'.

The 'default-expression' attribute is used to define the default value for our variable. On our very first poll, there will be no marker stored so we need to provide a default one. As we will be using the Twitter sinceId, which is numeric, we set this to 0 because all Tweets will have an Id greater than 0.

The 'update-expression' attribute is used to extract the marker from the data we are processing. As we are interested in the 'sinceId' attribute from each Tweet, we set the value to 'payload.sinceId'. Note that if we executed this expression directly on the Twitter search response, it would fail since the response is a collection of Tweets. However, the watermark processor is clever enough to work this out for us by automatically inspecting each in object in the collection.

After the watermark is defined, you will notice that the Twitter search includes the sinceId attribute and it's set to the lastId flow variable that we previously stored in the watermark.

On running this flow, you will see that the first poll will search for Tweets with an Id greater than '0' and subsequent polls will only search for Tweets with an Id greater than the last one we processed.

By using the watermark processor, Mule takes care of persisting your marker between polls in its internal ObjectStore. It takes care of checking if a watermark value already exists, it automatically gets the next value, and it will update the value when the poll is complete (or instead, leave it untouched if it fails). This ObjectStore is configurable and be customized to your own store backed backed by Redis, Mongo and so on so it can be shared.

Also note that the flow explicitly has an additional attribute named 'processingStrategy'. Processing strategies in MuleSoft development allow you configure the synchronicity and threading profiles of various elements in your Mule applications. The watermark processor dictates that you use the “synchronous” processing strategy as detailed in the previous example. More information on processing strategies can be found in the Flow Processing Strategies documentation.

Post Tags: