In one of my projects ,I needed to split the result of the reducer to 3 different Cassandra CFs according to score ranges calculation in the reducer .
The following code is an example of how we did it :
import java.io.IOException;import java.nio.ByteBuffer;import java.util.*;import org.apache.cassandra.db.IColumn;import org.apache.cassandra.thrift.*;import org.apache.cassandra.hadoop.*;import org.apache.cassandra.utils.ByteBufferUtil;import org.apache.hadoop.conf.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.util.*;public class ScoresBenchMark extends Configured implements Tool {public static void main(String[] args) throws Exception {ToolRunner.run(new Configuration(), new ScoresBenchMark(), args);System.exit(0);}public int run(String[] args) throws Exception {ConfigHelper.setRangeBatchSize(getConf(), 99);final Job job = new Job(getConf(), "ScoresBenchMark");final Configuration conf = job.getConfiguration();job.setJarByClass(ScoresBenchMark.class);job.setMapperClass(Map.class);//For testing set only one reducerjob.setNumReduceTasks(1);//Handle cassandra input output tablesConfigHelper.setInputRpcPort(conf, "9160");ConfigHelper.setInputInitialAddress(conf, "127.0.0.1"); //for test using the local host cassandraConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");ConfigHelper.setInputColumnFamily(conf, "TestDataKS", "RowDataCF");//get all recordsSlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.bytes(""), ByteBufferUtil.bytes(""), false, Integer.MAX_VALUE));ConfigHelper.setInputSlicePredicate(conf, predicate);ConfigHelper.setOutputInitialAddress(conf, "127.0.0.1");ConfigHelper.setOutputRpcPort(conf, "9160");ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");ConfigHelper.setOutputKeyspace(conf, "TestDataKS");//Set the 3 tests output CF by the calculation resultMultipleOutputs.addNamedOutput(job, "TestScoresCFLow", ColumnFamilyOutputFormat.class, ByteBuffer.class, List.class);MultipleOutputs.addNamedOutput(job, "TestScoresCFMid", ColumnFamilyOutputFormat.class, ByteBuffer.class, List.class);MultipleOutputs.addNamedOutput(job, "TestScoresCFHi", ColumnFamilyOutputFormat.class, ByteBuffer.class, List.class);//set up input as cassandra cfjob.setInputFormatClass(ColumnFamilyInputFormat.class);//Handle the output as cassandra cfjob.setReducerClass(Reduce.class);job.setOutputFormatClass(ColumnFamilyOutputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setOutputKeyClass(ByteBuffer.class);job.setOutputValueClass(List.class);job.waitForCompletion(true);return 0;}public static class Map extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable> {public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException {//Loop over the coloumn data and map key -> valuefor (IColumn nextCol : columns.values()){context.write(new Text(ByteBufferUtil.string(nextCol.name())),new LongWritable(ByteBufferUtil.toLong(nextCol.value())));}}}public static class Reduce extends Reducer<Text, LongWritable, ByteBuffer, List<Mutation>> {private MultipleOutputs _output;public void setup(Context context) {_output = new MultipleOutputs(context);}public void cleanup(Context context) throws IOException, InterruptedException {_output.close();}public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {for ( ICalculator nextCalcolator in mCalculators ){float theScore = nextCalcolator.Calc (values) ;String theCalculatorName = nextCalcolator.Class.ToString ();Column c = new Column();c.setName(ByteBufferUtil.bytes(theCalculatorName));c.setValue(ByteBufferUtil.bytes(theScore);c.setTimestamp(System.currentTimeMillis());Mutation m = new Mutation();m.setColumn_or_supercolumn(new ColumnOrSuperColumn());m.column_or_supercolumn.setColumn(c);if ( theScore < 30 ){TestScoresCFMid_output.write("TestScoresCFLow", ByteBufferUtil.bytes(key.toString()), Collections.singletonList(m));}else if ( theScore > 74 ){TestScoresCFMid_output.write("TestScoresCFHi", ByteBufferUtil.bytes(key.toString()), Collections.singletonList(m));}else{TestScoresCFMid_output.write("TestScoresCFMid", ByteBufferUtil.bytes(key.toString()), Collections.singletonList(m));}}}}}
References :
https://gist.githubusercontent.com/rstrickland/3763728/raw/7bb0b6d9211bc99e58ca222161e8b31db799a2d5/mo_example.java
http://modular.math.washington.edu/home/wstein/www/home/was/tmp/apache-cassandra-1.1.3/javadoc/org/apache/cassandra/thrift/Mutation.html
http://ng911dev1.cs.columbia.edu/docs/cassandra/org/apache/cassandra/thrift/SlicePredicate.html
http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html
http://svn.apache.org/repos/asf/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
אין תגובות:
הוסף רשומת תגובה