יום שלישי, 20 במאי 2014

Zero MQ Pub Sub Simple Sample

The following is a simple 0MQ tcp pub sub simple sample:
The main:

        ZMQ.Context context = ZMQ.context(1);
   
        ZeroMqPubServer theZeroMqPubServer = new ZeroMqPubServer(context);
        theZeroMqPubServer.start();
      
        ZeroMqSubClient theZeroMqSubClient = new ZeroMqSubClient(context);
        
        theZeroMqSubClient.start();
The server

import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
public class ZeroMqPubServer  extends Thread{
    
    ZMQ.Context  mContext;
    public ZeroMqPubServer(ZMQ.Context pContext) {
        this.mContext = pContext;
    }
    
     @Override
     public void run()
     {
         ZMQ.Socket publisher = mContext.socket(ZMQ.PUB);
        publisher.bind("tcp://*:5556");
        int theCounter = 0 ; 
         while (!Thread.currentThread ().isInterrupted ()) {
            try {
                theCounter++;
                //  Send message to all subscribers
                String update = String.format("MyTopic %d ", theCounter);
                
                publisher.send(update, 0);
                
                Thread.sleep(100);
            }
            // context.term ();
            catch (InterruptedException ex) {
                Logger.getLogger(ZeroMqPubServer.class.getName()).log(Level.SEVERE, null, ex);
            }          
        }
     }
}

The client


import java.nio.charset.Charset;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
public class ZeroMqSubClient  extends Thread{
    
      ZMQ.Context  mContext;
    public ZeroMqSubClient(ZMQ.Context pContext) {
        this.mContext = pContext;
    }
    
    
     @Override
     public void run()
     {
          try {
              Thread.sleep(3000 );
              
              ZMQ.Socket subscriber = mContext.socket(ZMQ.SUB);
              subscriber.connect("tcp://localhost:5556");
              
              //  Subscribe to zipcode, default is NYC, 10001
              String filter =  "MyTopic";
              subscriber.subscribe(filter.getBytes());
              
              while (!Thread.currentThread ().isInterrupted ()) {
                  
                  //  Use trim to remove the tailing '0' character
                  String theStr = subscriber.recvStr(Charset.defaultCharset());
                  
                  System.out.println("Recieved:" + theStr);
              }
              subscriber.close();
              //  mContext.term();
          } catch (InterruptedException ex) {
              Logger.getLogger(ZeroMqSubClient.class.getName()).log(Level.SEVERE, null, ex);
          }
    }
}

The result
debug:
Start
Recieved:MyTopic 31
Recieved:MyTopic 32
Recieved:MyTopic 33
Recieved:MyTopic 34
Recieved:MyTopic 35
Recieved:MyTopic 36
Recieved:MyTopic 37
Recieved:MyTopic 38
Recieved:MyTopic 39
Recieved:MyTopic 40
Recieved:MyTopic 41
Recieved:MyTopic 42
Recieved:MyTopic 43
Recieved:MyTopic 44
Recieved:MyTopic 45
Recieved:MyTopic 46
Recieved:MyTopic 47
Recieved:MyTopic 48
Recieved:MyTopic 49
Recieved:MyTopic 50
Recieved:MyTopic 51
Recieved:MyTopic 52
Recieved:MyTopic 53
Recieved:MyTopic 54
Recieved:MyTopic 55

אין תגובות:

הוסף רשומת תגובה