In the following post I will demonstrates using camel aggregate .
The example accumulates the massages body into comma separated string .
When the no of aggregation events reaches 3 it output the aggregate string into the log.
The simple aggregator:
package org.apache.camel.example.cxf;import org.apache.camel.Exchange;import org.apache.camel.processor.aggregate.AggregationStrategy;public class myAggregationStrategy implements AggregationStrategy {public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {int theMessageCounter = 0 ;String theMsgOld = "";if (oldExchange != null) {theMessageCounter = oldExchange.getIn().getHeader("MessageCounter", Integer.class);theMsgOld = oldExchange.getIn().getBody(String .class);}theMessageCounter = theMessageCounter + 1;String thenewMsg = newExchange.getIn().getBody(String .class);String theMsgToReturn = theMsgOld;if ( !"".equals(theMsgOld)){theMsgToReturn += ",";}theMsgToReturn += thenewMsg;newExchange.getIn().setBody (theMsgToReturn);newExchange.getIn().setHeader ( "MessageCounter" , Integer.toString( theMessageCounter) );return newExchange;}}
The Route builder :
package org.apache.camel.example.cxf;import java.util.Map;import java.util.logging.Level;import java.util.logging.Logger;import javax.ws.rs.core.Response;import org.apache.camel.CamelContext;import org.apache.camel.Exchange;import org.apache.camel.LoggingLevel;import org.apache.camel.Message;import org.apache.camel.Processor;import org.apache.camel.builder.RouteBuilder;import org.apache.camel.processor.interceptor.DefaultTraceFormatter;import org.apache.camel.processor.interceptor.Tracer;public class CamelRoute extends RouteBuilder {private String uri ="cxfrs:/myData?resourceClasses=org.apache.camel.example.cxf.RsService&bindingStyle=SimpleConsumer";static Logger myLogger = Logger.getLogger(CamelRoute.class.getName());@Overridepublic void configure() throws Exception {from("seda:tap").aggregate(body(), new myAggregationStrategy()).completionPredicate(header("MessageCounter").in(3)).log(LoggingLevel.INFO , "the body ${body}");from(uri).wireTap("seda:tap").process(new Processor() {public void process(Exchange exchange) throws Exception {Message inMessage = exchange.getIn();// The parameter of the invocation is stored in the body of in messageString theBody = inMessage.getBody(String.class);theResult += "The Body:" + theBody;myLogger.setLevel (Level.INFO );Response r = Response.ok("This is the result\r\n" +theBody).status(200).build();exchange.getOut().setBody(r);}});
There is an additional route that is created in order to handle the aggregation.
A copy of the original message is send to the route using the Seda .
The aggregator class aggregate the body of the original messages into comma delimited string and attach a message header with a counter variable that counts the no of execution of the aggregator .
The route checks the counter header and if its equal to 3 then the output of the aggregator is flashed away to the log component .
Note the a new instance of the aggregator state is created after the flashing .
Test rest calls:
/camel-example-cxfrs-tomcat/webservices/myData/RsService/SayHello/Cat
/camel-example-cxfrs-tomcat/webservices/myData/RsService/SayHello/Dog
/camel-example-cxfrs-tomcat/webservices/myData/RsService/SayHello/Pig
And the Result in the log :
2014-04-25 16:32:37,876 [#0 - seda://tap] INFO route1 - the body Cat,Dog,Pig
Note:
The is a timeout issue that resets the aggregation component state if a message is not reached in a time period .
אין תגובות:
הוסף רשומת תגובה