יום חמישי, 17 באפריל 2014

Splitting the reducer output to different Cfs

In one of my projects ,I needed to split the result of the reducer to 3  different Cassandra CFs according to score ranges calculation in the reducer .
The following code is an example of how we did it :

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.*;
public class ScoresBenchMark extends Configured implements Tool {
   
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new ScoresBenchMark(), args);
        System.exit(0);
    }
  
    public int run(String[] args) throws Exception {
  ConfigHelper.setRangeBatchSize(getConf(), 99);
    
        final Job job = new Job(getConf(), "ScoresBenchMark");
        final Configuration conf = job.getConfiguration();
   
        job.setJarByClass(ScoresBenchMark.class);
        
  job.setMapperClass(Map.class);
  
  //For testing set only one reducer 
        job.setNumReduceTasks(1);
     
        //Handle cassandra input output tables 
  ConfigHelper.setInputRpcPort(conf, "9160");
        
  ConfigHelper.setInputInitialAddress(conf, "127.0.0.1"); //for test using the local host cassandra 
  
        ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
        
  ConfigHelper.setInputColumnFamily(conf, "TestDataKS", "RowDataCF");
        
  //get all records
        SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.bytes(""), ByteBufferUtil.bytes(""), false, Integer.MAX_VALUE));
        
  ConfigHelper.setInputSlicePredicate(conf, predicate);
    
        ConfigHelper.setOutputInitialAddress(conf, "127.0.0.1");
        
  ConfigHelper.setOutputRpcPort(conf, "9160");
        
  ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
        
  ConfigHelper.setOutputKeyspace(conf, "TestDataKS");
  //Set the 3 tests output CF by the calculation result 
  MultipleOutputs.addNamedOutput(job, "TestScoresCFLow", ColumnFamilyOutputFormat.class, ByteBuffer.class, List.class);
        
  MultipleOutputs.addNamedOutput(job, "TestScoresCFMid", ColumnFamilyOutputFormat.class, ByteBuffer.class, List.class);
  
  MultipleOutputs.addNamedOutput(job, "TestScoresCFHi", ColumnFamilyOutputFormat.class, ByteBuffer.class, List.class);
  
        //set up input as cassandra cf        
        job.setInputFormatClass(ColumnFamilyInputFormat.class);
        
  //Handle the output as cassandra cf 
        
        job.setReducerClass(Reduce.class);
        job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(ByteBuffer.class);
        job.setOutputValueClass(List.class);
    
        job.waitForCompletion(true);
        return 0;
    }
  
    public static class Map extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable> {  
    
        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException { 
      
   //Loop over the coloumn data and map key -> value 
            for (IColumn nextCol : columns.values()){
             context.write(new Text(ByteBufferUtil.string(nextCol.name())),
         new LongWritable(ByteBufferUtil.toLong(nextCol.value())));
   }
        }   
    }
  
    public static class Reduce extends Reducer<Text, LongWritable, ByteBuffer, List<Mutation>> {
 
        private MultipleOutputs _output;
        public void setup(Context context) {
            _output = new MultipleOutputs(context);
        }
        public void cleanup(Context context) throws IOException, InterruptedException {
            _output.close();
        }
    
        public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
   for ( ICalculator nextCalcolator in mCalculators )
   {
    float theScore = nextCalcolator.Calc (values) ;
    String theCalculatorName = nextCalcolator.Class.ToString ();
    Column c = new Column();
            
    c.setName(ByteBufferUtil.bytes(theCalculatorName));
    c.setValue(ByteBufferUtil.bytes(theScore);
    c.setTimestamp(System.currentTimeMillis());
    Mutation m = new Mutation();
    m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
    m.column_or_supercolumn.setColumn(c);
    
    if ( theScore < 30 )
    {
     TestScoresCFMid_output.write("TestScoresCFLow", ByteBufferUtil.bytes(key.toString()), Collections.singletonList(m));
    } 
    else if ( theScore > 74 )
    {
     TestScoresCFMid_output.write("TestScoresCFHi", ByteBufferUtil.bytes(key.toString()), Collections.singletonList(m));
    }
    else 
    {
     TestScoresCFMid_output.write("TestScoresCFMid", ByteBufferUtil.bytes(key.toString()), Collections.singletonList(m));
    }
    
   }
        }
    }
}

References :
https://gist.githubusercontent.com/rstrickland/3763728/raw/7bb0b6d9211bc99e58ca222161e8b31db799a2d5/mo_example.java
http://modular.math.washington.edu/home/wstein/www/home/was/tmp/apache-cassandra-1.1.3/javadoc/org/apache/cassandra/thrift/Mutation.html
http://ng911dev1.cs.columbia.edu/docs/cassandra/org/apache/cassandra/thrift/SlicePredicate.html
http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html
http://svn.apache.org/repos/asf/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java

אין תגובות:

הוסף רשומת תגובה