The following is a simple sample of using Disruptor in order to deliver work to other threads
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>myLearn</groupId><artifactId>Learn-disruptor</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>disruptor-example</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>com.googlecode.disruptor</groupId><artifactId>disruptor</artifactId><version>2.10.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency></dependencies></project>
Declare a command that we wants to be executed using the Disruptor:
The ICommand
package myLearn;public interface ICommand {public void DoWork ();public String getValue();public void setValue(String pValue);}
The concrete command
package myLearn;public class DoThisAnThat implements ICommand {public String getValue() {return mValue;}public void setValue(String pValue) {this.mValue = pValue;}private String mValue ;public void DoWork() {System.out.println(String.format("Call command for %s" ,mValue));}}
The command factory
package myLearn;import com.lmax.disruptor.EventFactory;public class DoThisAnThatFactory implements EventFactory<ICommand>{public ICommand newInstance() {return new DoThisAnThat();}}
The program itself
package myLearn;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;public class PlayWithdisruptor {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();// Preallocate RingBuffer with 4 ValueEvents
Disruptor<ICommand> disruptor = new Disruptor<ICommand>(new DoThisAnThatFactory(), 4, exec);final EventHandler<ICommand> handler = new EventHandler<ICommand>() {// event will eventually be recycled by the Disruptor after it wrapspublic void onEvent(final ICommand pCommand, final long sequence, final boolean endOfBatch) throws Exception {System.out.println(String.format("the sequence %s theendOfBatch %s " , sequence, Boolean.toString(endOfBatch)));pCommand.DoWork();}};disruptor.handleEventsWith(handler);RingBuffer<ICommand> ringBuffer = disruptor.start();for (long i = 10; i < 20; i++) {// Two phase commit. Grab one of the 4 slots
long seq = ringBuffer.next();ICommand theCommand = ringBuffer.get(seq);String theName = String.format("Hello Zvika (%d)", i);theCommand.setValue(theName);ringBuffer.publish(seq);}System.out.println("After submiting the work");disruptor.shutdown();exec.shutdown();}}
The Result
the sequence 0 theendOfBatch true
Call command for Hello Zvika (10)
the sequence 1 theendOfBatch false
Call command for Hello Zvika (11)
the sequence 2 theendOfBatch false
Call command for Hello Zvika (12)
the sequence 3 theendOfBatch true
Call command for Hello Zvika (13)
the sequence 4 theendOfBatch true
Call command for Hello Zvika (14)
the sequence 5 theendOfBatch true
Call command for Hello Zvika (15)
the sequence 6 theendOfBatch false
Call command for Hello Zvika (16)
the sequence 7 theendOfBatch false
Call command for Hello Zvika (17)
the sequence 8 theendOfBatch true
Call command for Hello Zvika (18)
After submiting the work
the sequence 9 theendOfBatch true
Call command for Hello Zvika (19)