Using aggregate in the CXFRS camel route example

In the following post I will demonstrates using camel aggregate .
The example accumulates the massages body into comma separated string .
When the no of aggregation events reaches 3 it output the aggregate string into the log.
The simple aggregator:

package org.apache.camel.example.cxf;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class myAggregationStrategy  implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        int theMessageCounter = 0 ; 
        String theMsgOld = "";
        if (oldExchange != null) {           
            theMessageCounter = oldExchange.getIn().getHeader("MessageCounter", Integer.class);
            theMsgOld = oldExchange.getIn().getBody(String .class);
        theMessageCounter = theMessageCounter + 1;
        String thenewMsg =  newExchange.getIn().getBody(String .class);
        String theMsgToReturn = theMsgOld;
        if ( !"".equals(theMsgOld))
            theMsgToReturn += ",";
        theMsgToReturn += thenewMsg;
        newExchange.getIn().setBody (theMsgToReturn);
        newExchange.getIn().setHeader ( "MessageCounter" , Integer.toString( theMessageCounter) );
        return newExchange;

The Route builder :

package org.apache.camel.example.cxf;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.core.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.interceptor.DefaultTraceFormatter;
import org.apache.camel.processor.interceptor.Tracer;
    public class CamelRoute extends RouteBuilder {
    private String uri ="cxfrs:/myData?resourceClasses=org.apache.camel.example.cxf.RsService&bindingStyle=SimpleConsumer";
     static Logger myLogger = Logger.getLogger(
    public void configure() throws Exception {   
        from("seda:tap").aggregate(body(), new myAggregationStrategy()).
                 .log(LoggingLevel.INFO , "the body ${body}");
		  .process(new Processor() {
            public void process(Exchange exchange) throws Exception { 
                   Message inMessage = exchange.getIn();                        
                    // The parameter of the invocation is stored in the body of in message
                    String theBody = inMessage.getBody(String.class);
                    theResult += "The Body:" + theBody;
                    myLogger.setLevel (Level.INFO );
                    Response r = Response.ok("This is the result\r\n" +theBody).status(200).build();                                                              

There is an additional route that is created in order to handle the aggregation.
A copy of the original message is send to the route using the Seda .
The aggregator class aggregate the body of the original messages into comma delimited string and attach a message header with a counter variable that counts the no of execution of the aggregator .
The route checks the counter header and if its equal to 3 then the output of the aggregator is flashed away to the log component .
Note the a new instance of the aggregator state is created after the flashing .

Test rest calls:

And the Result in the log :
2014-04-25 16:32:37,876 [#0 - seda://tap] INFO  route1                         - the body Cat,Dog,Pig

The is a timeout issue that resets the aggregation component state if  a message is not reached in a time period .

