ActiveMQ propose a message of type MapMessage that its is a Wrapper of HashMap collection.
Example for Setting the Message Values:
1: package com.mycompany.client;2:3: import javax.jms.*;4: import javax.naming.InitialContext;5: import javax.naming.NamingException;6: import java.util.Properties;7: import java.util.logging.Level;8: import java.util.logging.Logger;9:10: public class MessageProducer {11: private String topicName = "myTopic.Programming";12:13: private String initialContextFactory = "org.apache.activemq"14: +".jndi.ActiveMQInitialContextFactory";15: private String connectionString = "tcp://localhost:61616";16:17:18: public void publishWithTopicLookup() {19: try {20: Properties properties = new Properties();21: TopicConnection topicConnection = null;22: properties.put("java.naming.factory.initial", initialContextFactory);23: properties.put("connectionfactory.QueueConnectionFactory",24: connectionString);25: properties.put("topic." + topicName, topicName);26:27:28: // initialize29: // the required connection factories30: InitialContext ctx = new InitialContext(properties);31: TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx32: .lookup("QueueConnectionFactory");33: topicConnection = topicConnectionFactory.createTopicConnection();34:35: TopicSession topicSession = topicConnection.createTopicSession(36: false, Session.AUTO_ACKNOWLEDGE);37: Topic topic = (Topic) ctx.lookup(topicName);38:39: javax.jms.TopicPublisher topicPublisher = topicSession40: .createPublisher(topic);41:42: MapMessage message = topicSession.createMapMessage();43:44: message.setString("Name", "Zvika");45: message.setDouble("Age", 42.6);46:47: topicPublisher.publish(message);48: System.out.println("Publishing message ");49: topicPublisher.close();50: topicSession.close();51: topicConnection.close();52: } catch (NamingException ex) {53: Logger.getLogger(MessageProducer.class.getName()).log(Level.SEVERE, null, ex);54: }55: catch (JMSException e) {56: throw new RuntimeException("Error in initial context lookup", e);57: }58: }59: }
Example for consuming a MapMessage:
1: package com.mycompany.client;2:3: import java.util.Enumeration;4: import java.util.Properties;5: import java.util.logging.Level;6: import java.util.logging.Logger;7: import javax.jms.JMSException;8: import javax.jms.MapMessage;9: import javax.jms.Message;10: import javax.jms.MessageListener;11: import javax.jms.Session;12: import javax.jms.Topic;13: import javax.jms.TopicConnection;14: import javax.jms.TopicConnectionFactory;15: import javax.jms.TopicSession;16: import javax.jms.TopicSubscriber;17: import javax.naming.InitialContext;18: import javax.naming.NamingException;19:20: public class MessageConsumer {21:22: private class TestMessageListener implements MessageListener {23: public void onMessage(Message message) {24: try {25: System.out.println("Got the Message TimeStamp: "26: + message.getJMSTimestamp());27: System.out.println("Got the Message JMS ID : "28: + message.getJMSMessageID() );29:30: MapMessage theMsg = (MapMessage)message;31:32: Enumeration theEnumeration = theMsg.getMapNames();33:34: while ( theEnumeration.hasMoreElements() )35: {36: String nextKey = theEnumeration.nextElement().toString();37:38: System.out.println("The key:" + nextKey + " the value:" + theMsg.getObjectProperty(nextKey).toString());39: }40: } catch (JMSException e) {41: e.printStackTrace();42: }43: }44: }45:46: private String topicName = "myTopic.Programming";47:48: private String initialContextFactory = "org.apache.activemq"49: +".jndi.ActiveMQInitialContextFactory";50: private String connectionString = "tcp://localhost:61616";51:52:53: public void ListenWithTopicLookup() {54: try {55: Properties properties = new Properties();56: TopicConnection topicConnection = null;57: properties.put("java.naming.factory.initial", initialContextFactory);58: properties.put("connectionfactory.QueueConnectionFactory",59: connectionString);60: properties.put("topic." + topicName, topicName);61:62:63: // initialize64: // the required connection factories65: InitialContext ctx = new InitialContext(properties);66: TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx67: .lookup("QueueConnectionFactory");68: topicConnection = topicConnectionFactory.createTopicConnection();69:70: TopicSession topicSession = topicConnection.createTopicSession(71: false, Session.AUTO_ACKNOWLEDGE);72: Topic topic = (Topic) ctx.lookup(topicName);73:74: TopicSubscriber theTopicSubscriber = topicSession.createSubscriber(topic);75:76: theTopicSubscriber.setMessageListener(new TestMessageListener());77:78: topicConnection.start();79:80: } catch (NamingException ex) {81: Logger.getLogger(MessageProducer.class.getName()).log(Level.SEVERE, null, ex);82: }83: catch (JMSException e) {84: throw new RuntimeException("Error in initial context lookup", e);85: }86: }87: }
אין תגובות:
הוסף רשומת תגובה