In a previous post, we introduced the Mule ESB Batch module. In this post we’re going to take a deeper dive in some other features.
Support for not Serializable Objects
A limitation in the first release of batch was that all records needed to have a Serializable payload. This is so because batch uses persistent queues to buffer the records making it possible to processes “larger than memory” sets of data. However, we found that non Serializable payloads were way more common that we initially thought. So, we decided to have batch use the Kryo serializer instead of the Java’s standard. Kryo is a very cool serialization library that allows:
- Serializing objects that do not implement the Serializable interface
- Serializing objects that do not have (nor inherit) a default constructor
- It’s way faster than the Java serializer and produces smaller outputs
Introducing Kryo into de project did not only made batch more versatile by removing limitations, it also had a great impact in performance. During our testing, we saw performance improvements of up to 40% by doing nothing but just using Kyro (of course that the speed boost is relative to the jobs characteristics; if you have a batch job that spends 90% of its time doing IO, the impact in performance won’t be as visible as in one that juggles between IO and CPU processing)
Another great feature comes from improving the <batch:commit> functionality. If you remember from the last post, the <batch:commit> block allows you to group records to perform bulk operations over them. The most typical example would be to read contacts from a CSV file, group records in blocks of 100, use DataMapper to transform them and then integrate with a system that supports bulk updates such as a Database or Salesforce. This is an example of how it looks when reading contacts from Salesforce and doing bulk insert into Google Contacts:
This is all good but what about about the reverse use case? What if I’m reading 1 million records from Salesforce or Google and I want to write them into a CSV file? I can’t transform all records at once because I’d run out of memory. Grouping records in blocks of 100 is inefficient because I would have to perform 100K transformation and I would have to write down the block to disk 100K times (not to mention that I would have to actually handle the file access myself).
For these kind of use cases we now added the concept of streaming commit. The difference between fixed sized commit (what you first saw in the December release) and the streaming commit is that instead of a List of elements you receive a streaming iterator that will give you access to ALL the records in the job. Yes, the one million of them without the risk of running out of memory. This means that by combining batch streaming with DataMapper streaming, you can transform the whole dataset in one single operation and in one single write to disk. Let’s see an example of that!
Consider this simple batch job:
It all starts with a simple query that just gets name and email from all your Salesforce contacts. Assume that it returns millions of records which batch will stream, split and load into the batch job. The actual query using DataSense Query Language looks like this:
Then, we drop a batch:commit and we set it in streaming mode:
Inside of the commit scope, we place a DataMapper that transform these Contacts into a CSV file:
As you can see in the image above, using the power of Mule DataSense, DataMapper already knows that you’re trying to mal a List of Contacts (it doesn’t matter if it’s an Iterator instead of a List, for DataMapper it’s all the same) and then I selected CSV as the output type. I then selected “User defined” for the structure of the output CSV and then clicked on “Generate Default” to have DataMapper automatically make the trivial configuration which is good enough for this example.
After clicking in “Create Mapping” the transformation is created. I only have to go into its configuration to tell it want to enable streaming:
Once in the configuration dialog, all you have to do is check the little streaming box:
Now that DataMapper is set to stream, it will start writing each line of the CSV to an InputStream at the same rate the batch starts giving it records. Because the DataMapper box is followed by an FTP outbound, the CSV will also be written to a FTP location as the InputStream is populated. In this example, you just got three levels of streaming with seamless usage, while each transformed record is already being processed by the next step while you’re still writing to the FTP location!
Another great usage of this feature would be to place a step at the end of your job which only accepts failed records. By setting a streaming commit inside of that step, you could build a report of all failed records, no matter how many they are.
Another interesting use case is around scheduling job executions. Suppose that you have two concurrent executions of the same job. This could happen either because the job is started by a poll element that fired twice before the first job execution finished or the job was manually executed twice. In either case, you have two instances of the same job competing for the same resources. How to schedule them?
Originally there was no way of configuring that. By default, the batch module would try to execute all available instances by using a round robin algorithm to assign the available resources. This is fine for the use case in which jobs are triggered manually and you want them to run concurrently. But there’re other uses cases, in which you need the first job instance to finish before allowing any others to run. An example of this situation is a one-way sync between two systems.
Suppose you have a job that triggers every 5 minutes, polling one system for updated records and then reflecting those changes in another one. Suppose that the job is triggered twice in the lapse of five minutes, and that both job instances contains a record which was updated twice in that lapse. If both jobs run concurrently, there’s a chance that the second job can process that record before the first one, leaving the target system in an inconsistent state.
To account for these use cases, you can now choose between two scheduling strategies at a job level:
- Ordered Sequential: This is the new default strategy. It means that job instances are executed one at a time, in the order they were created. In simpler words, if a job instance was triggered at 12:00:00 and another one was triggered at 12:00:01, the second instance will not be executed until the first one leaves the executable state.
- Round Robin: This is the behaviour we used to have in December 2013 release
To configure the strategy in Studio, simply click on a batch job and a selector will display with all available strategies:
Notice that the scheduling strategy is configured at a job level! This means that it’s only local to instances of the same job. If your application has more than one job, each one will have a different scheduling strategy that acts on its own. Setting all job’s scheduling strategies to ORDERED_SEQUENTIAL doesn’t mean that there will be no two jobs running at the same time at an application level. It means that no two instances of the same job will be running at the same time.
Thanks for reading!