Scatter-Gather in Mule






Scatter-Gather in Mule



Here we are using Message Enricher in mule 3 in Anypoint studio 6.2

Scatter Gather routes the payload to multiple destinations parallely. It then collects the responses from all routes and aggregates them back into a single response.


Once a message is received by Scatter-Gather, it sends a message for concurrent processing to all configured routes. The main thread executing the flow that owns the router waits until all routes complete or time out.

If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop Scatter-Gather from sending messages to its other configured routes, so it is possible that many or all routes may fail concurrently.

If and when some of the route fails,Scatter-Gather performs the below operations:

1. Sets the exception payload accordingly for each route.
2. Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.

Aggregation Strategies :

We can define our own aggregation strategies for Scatter-Gather to override its default aggregation strategy.

1. Discard message responses.
2. Merge message properties that originated in different routes.
3. Discard failed messages without throwing an exception.
4. Select only one from multiple responses.

Here below there is scatter gather flow in which data is inserted in three tables of a database.Three DB Connectors are attached with scatter gather flow control to insert data in all three tables parallely.


Same MySQL DB with demodata schema we are using and i made three tables info,info2 & info3.


Here we are using default aggregation strategy as shown below :





path : /scattergather

url : http://localhost:8085/api/scattergather
method : POST

Input :


Output :



XML project code :


<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd">
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8085" doc:name="HTTP Listener Configuration" basePath="/api"/>
    <db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" password="*******" database="demodata" doc:name="MySQL Configuration"/>
    <flow name="scatter-gatherFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/scattergather" doc:name="HTTP"/>
        <object-to-byte-array-transformer doc:name="Object to Byte Array"/>
        <scatter-gather doc:name="Scatter-Gather">
            <db:insert config-ref="MySQL_Configuration" doc:name="Database">
                <db:parameterized-query><![CDATA[insert into info (ID,NAME,AGE) values (#[json:id],#[json:name],#[json:age]);]]></db:parameterized-query>
            </db:insert>
            <db:insert config-ref="MySQL_Configuration" doc:name="Database">
                <db:parameterized-query><![CDATA[insert into info2 (ID,NAME,AGE) values (#[json:id],#[json:name],#[json:age]);]]></db:parameterized-query>
            </db:insert>
            <db:insert config-ref="MySQL_Configuration" doc:name="Database">
                <db:parameterized-query><![CDATA[insert into info3 (ID,NAME,AGE) values (#[json:id],#[json:name],#[json:age]);]]></db:parameterized-query>
            </db:insert>
        </scatter-gather>
        <set-payload value="Data inserted in tables" doc:name="Set Payload"/>
    </flow>
</mule>




© 2020 goformule.com

Share on :

Mule 4:

XML to JSON in mule 4 Web service consumer VM Validation in Mule 4 Until Successful
Sub flow Set Variable & Remove Variable Set Transaction ID Scatter Gather Round Robin
Consume Restful Service CRUD in Mule 4 Parse Template Object to JSON Load Static Resource
JSON to XML Invoke Idempotent Filter ForEach Flat file to JSON
Fixwidth to JSON First Successful File Execute Error Handling
Email Dynamic Evaluate Custom Business Event CSV to JSON Copybook to JSON
Choice Router Async RabbitMQ


Mule 3:


Database Connector Async CXF Attachement XSLT
Mongo DB Cache scope Custom Business Event Gzip Compress Expression
SMTP For Each Expression Javascript Filter Reference
Salesforce Message Enricher Groovy Parse Template Idempotent
VM Poll Invoke Ruby Message
Webservice consumer Python Transformer reference Message Property Not
Collection Aggregator First Sucessful Catch Exception Strategy Reference Exception Strategy Or
Choice Message Chunk Aggregator Choice Exception Strategy Global Exception Strategy Schema Validation
Custom Aggregator Scatter Gather Custom Exception Strategy Batch Processing MEL
M Unit RAML Map Operator API Mocking Dataweave