יום שישי, 25 באפריל 2014

Using aggregate in the CXFRS camel route example

In the following post I will demonstrates using camel aggregate .
The example accumulates the massages body into comma separated string .
When the no of aggregation events reaches 3 it output the aggregate string into the log.
The simple aggregator:

package org.apache.camel.example.cxf;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class myAggregationStrategy  implements AggregationStrategy {
 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        
        int theMessageCounter = 0 ; 
        String theMsgOld = "";
        if (oldExchange != null) {           
            theMessageCounter = oldExchange.getIn().getHeader("MessageCounter", Integer.class);
        
            theMsgOld = oldExchange.getIn().getBody(String .class);
        }
        
        theMessageCounter = theMessageCounter + 1;
        
        String thenewMsg =  newExchange.getIn().getBody(String .class);
        
        String theMsgToReturn = theMsgOld;
        
        if ( !"".equals(theMsgOld))
        {
            theMsgToReturn += ",";
        }
        
        theMsgToReturn += thenewMsg;
                
        newExchange.getIn().setBody (theMsgToReturn);
        
        newExchange.getIn().setHeader ( "MessageCounter" , Integer.toString( theMessageCounter) );
        
        return newExchange;
    } 
}

The Route builder :


package org.apache.camel.example.cxf;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.core.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.interceptor.DefaultTraceFormatter;
import org.apache.camel.processor.interceptor.Tracer;
    public class CamelRoute extends RouteBuilder {
            
    private String uri ="cxfrs:/myData?resourceClasses=org.apache.camel.example.cxf.RsService&bindingStyle=SimpleConsumer";
  
     static Logger myLogger = Logger.getLogger(
                      CamelRoute.class.getName());
    @Override
    public void configure() throws Exception {   
              
        from("seda:tap").aggregate(body(), new myAggregationStrategy()).
            completionPredicate(header("MessageCounter").in(3))
                 .log(LoggingLevel.INFO , "the body ${body}");
        from(uri)
		  .wireTap("seda:tap")
		  .process(new Processor() {
            public void process(Exchange exchange) throws Exception { 
                   Message inMessage = exchange.getIn();                        
                 
                    // The parameter of the invocation is stored in the body of in message
                    String theBody = inMessage.getBody(String.class);
                    theResult += "The Body:" + theBody;
                    myLogger.setLevel (Level.INFO );
                    Response r = Response.ok("This is the result\r\n" +theBody).status(200).build();                                                              
                          
                    exchange.getOut().setBody(r);                 
                    }                     
                });

There is an additional route that is created in order to handle the aggregation.
A copy of the original message is send to the route using the Seda .
The aggregator class aggregate the body of the original messages into comma delimited string and attach a message header with a counter variable that counts the no of execution of the aggregator .
The route checks the counter header and if its equal to 3 then the output of the aggregator is flashed away to the log component .
Note the a new instance of the aggregator state is created after the flashing .

Test rest calls:
/camel-example-cxfrs-tomcat/webservices/myData/RsService/SayHello/Cat
/camel-example-cxfrs-tomcat/webservices/myData/RsService/SayHello/Dog
/camel-example-cxfrs-tomcat/webservices/myData/RsService/SayHello/Pig


And the Result in the log :
2014-04-25 16:32:37,876 [#0 - seda://tap] INFO  route1                         - the body Cat,Dog,Pig


Note:
The is a timeout issue that resets the aggregation component state if  a message is not reached in a time period .

אין תגובות:

הוסף רשומת תגובה