Scatter-Gather in Mule
Here we are using Message Enricher in mule 3 in Anypoint studio 6.2
Scatter Gather routes the payload to multiple destinations parallely. It then collects the responses from all routes and aggregates them back into a single response.
Once a message is received by Scatter-Gather, it sends a message for concurrent processing to all configured routes. The main thread executing the flow that owns the router waits until all routes complete or time out.
If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop Scatter-Gather from sending messages to its other configured routes, so it is possible that many or all routes may fail concurrently.
If and when some of the route fails,Scatter-Gather performs the below operations:
1. Sets the exception payload accordingly for each route.
2. Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.
Aggregation Strategies :
We can define our own aggregation strategies for Scatter-Gather to override its default aggregation strategy.
1. Discard message responses.
2. Merge message properties that originated in different routes.
3. Discard failed messages without throwing an exception.
4. Select only one from multiple responses.
Here below there is scatter gather flow in which data is inserted in three tables of a database.Three DB Connectors are attached with scatter gather flow control to insert data in all three tables parallely.
Same MySQL DB with demodata schema we are using and i made three tables info,info2 & info3.
Here we are using default aggregation strategy as shown below :
path : /scattergather
url : http://localhost:8085/api/scattergather
method : POST
method : POST
Input :
Output :
XML project code :
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd">
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8085" doc:name="HTTP Listener Configuration" basePath="/api"/>
<db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" password="*******" database="demodata" doc:name="MySQL Configuration"/>
<flow name="scatter-gatherFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/scattergather" doc:name="HTTP"/>
<object-to-byte-array-transformer doc:name="Object to Byte Array"/>
<scatter-gather doc:name="Scatter-Gather">
<db:insert config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into info (ID,NAME,AGE) values (#[json:id],#[json:name],#[json:age]);]]></db:parameterized-query>
</db:insert>
<db:insert config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into info2 (ID,NAME,AGE) values (#[json:id],#[json:name],#[json:age]);]]></db:parameterized-query>
</db:insert>
<db:insert config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into info3 (ID,NAME,AGE) values (#[json:id],#[json:name],#[json:age]);]]></db:parameterized-query>
</db:insert>
</scatter-gather>
<set-payload value="Data inserted in tables" doc:name="Set Payload"/>
</flow>
</mule>