יום ראשון, 2 ביוני 2013

ActiveMQ simple listener

I wrote a simple listener to the publisher from previous ActiveMQ post: activemq-topic-sender

The MessageListener callback class that’s receives notification when a topic message has received.

  1: private class TextMessageListener implements MessageListener {
  2:  public void onMessage(Message message) {
  3:   try {
  4:         System.out.println("Got the Message  TimeStamp: "
  5:             +  message.getJMSTimestamp());
  6:         System.out.println("Got the Message JMS ID : "
  7:             +  message.getJMSMessageID() );
  8:         
  9:         TextMessage theMsg = (TextMessage)message;
 10:         System.out.println("The message:" + theMsg.getText());
 11:   
 12:   } catch (JMSException e) {
 13:    e.printStackTrace();
 14:   }
 15:  }
 16: } 

The message consumer class :
Note:the connection must be started in order to receive notifications


  1: public class MessageConsumer {
  2: 
  3: private String topicName = "myTopic.Programming";
  4: private String initialContextFactory = "org.apache.activemq"
  5:   +".jndi.ActiveMQInitialContextFactory";
  6: private String connectionString = "tcp://localhost:61616";
  7:   
  8: public void ListenWithTopicLookup() {
  9:      try {
 10:          Properties properties = new Properties();
 11:          TopicConnection topicConnection = null;
 12:          properties.put("java.naming.factory.initial", initialContextFactory);
 13:          properties.put("connectionfactory.QueueConnectionFactory",
 14:            connectionString);
 15:          properties.put("topic." + topicName, topicName);
 16:         
 17:          
 18:           // initialize
 19:           // the required connection factories
 20:           InitialContext ctx = new InitialContext(properties);
 21:           TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
 22:                            .lookup("QueueConnectionFactory");
 23:           topicConnection = topicConnectionFactory.createTopicConnection();
 24:          
 25:           TopicSession topicSession = topicConnection.createTopicSession(
 26:              false, Session.AUTO_ACKNOWLEDGE);
 27:            Topic topic = (Topic) ctx.lookup(topicName);
 28: 
 29:            TopicSubscriber theTopicSubscriber = topicSession.createSubscriber(topic);
 30:            
 31:            theTopicSubscriber.setMessageListener(new TextMessageListener());
 32:            
 33:            topicConnection.start();
 34:     
 35:      } catch (NamingException ex) {
 36:          Logger.getLogger(MessageProducer.class.getName()).log(Level.SEVERE, null, ex);
 37:      }
 38:          catch (JMSException e) {
 39:    throw new RuntimeException("Error in initial context lookup", e);
 40:   }
 41:  }
 42: }

I change the main class in order to start 2 new message consumers :


  1:  public static void main( String[] args )
  2:     {
  3:         System.out.println( "Hello World!" );
  4:         
  5:         MessageConsumer theMessageConsumer= new     MessageConsumer();
  6:         
  7:         theMessageConsumer.ListenWithTopicLookup();
  8:    
  9:         MessageConsumer OthertheMessageConsumer= new     MessageConsumer();
 10:         
 11:         OthertheMessageConsumer.ListenWithTopicLookup();
 12:         
 13:         MessageProducer publisher = new MessageProducer();
 14:   
 15:         publisher.publishWithTopicLookup();
 16:         try {
 17:             Thread.sleep( 1000);
 18:         } catch (InterruptedException ex) {
 19:             Logger.getLogger(App.class.getName()).log(Level.SEVERE, null, ex);
 20:         }

The Result :
[exec:exec]
Hello World!
log4j:WARN No appenders could be found for logger (org.apache.activemq.thread.TaskRunnerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Publishing message ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:Zvika-PC-2639-1370199196674-5:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://myTopic.Programming, transactionId = null, expiration = 0, timestamp = 1370199196906, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = This is a test message}
Got the Message  TimeStamp: 1370199196906
Got the Message JMS ID : ID:Zvika-PC-2639-1370199196674-5:1:1:1:1
The message:This is a test message
Got the Message  TimeStamp: 1370199196906
Got the Message JMS ID : ID:Zvika-PC-2639-1370199196674-5:1:1:1:1
The message:This is a test message

Each listener got notification about the message that was send .

אין תגובות:

הוסף רשומת תגובה