Monday 2 February 2015

Extract Hbase table through Mapreduce and print table data in to hdfs file (Sample program)

Create Hbase Table


create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'

Create Java project with 3 files and include all Hbase required Jars:-

testDriver.java :

package TableDataHdfsFile;

import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.commons.io.
FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class testDriver
{   
      static String FilePath = "/home/admin/Test/mysummary";
      public static void main(String[] args) throws Exception
      {
          File file = new File(FilePath);
          if (file.exists())
          {
              FileUtils.cleanDirectory(file); //clean out directory (this is optional -- but good know)
              FileUtils.forceDelete(file); //delete directory
          }

        FileSystem fs = FileSystem.get(config);
          if (fs.exists( new Path(FilePath)))
          {
              fs.delete(new Path(FilePath), true); // delete file, true for recursive
          }
          Configuration config = HBaseConfiguration.create();
          Job job = Job.getInstance(config);
          job.setJobName("ExampleSummaryToFile");
          job.setJarByClass(testDriver.class);     // class that contains mapper and reducer

          Scan scan = new Scan();
          scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
          scan.setCacheBlocks(false);  // don't set to true for MR jobs
          // set other scan attrs

          TableMapReduceUtil.initTableMapperJob(
                  "test1",        // input table
                  scan,               // Scan instance to control CF and attribute selection
                  testMapper.class,     // mapper class
                  Text.class,         // mapper output key
                  IntWritable.class,  // mapper output value
                  job);
          System.out.println(job.getJobName());
          job.setReducerClass(testReducer.class);    // reducer class
          job.setNumReduceTasks(1);    // at least one, adjust as required
         
        

          FileOutputFormat.setOutputPath(job, new Path(FilePath));  // adjust directories as required

          boolean b = job.waitForCompletion(true);
          if (!b) {
                  throw new IOException("error with job!");
          }
      }
}



testMapper.java :-

package TableDataHdfsFile;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;


public  class testMapper extends TableMapper<Text, IntWritable>  {
    public static final byte[] CF = "cf".getBytes();

    private final IntWritable ONE = new IntWritable(1);
       private Text text = new Text();

       public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
          // byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
            String val = new String(value.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales")));
            text.set(val);     // we can only emit Writables...
            context.write(text, ONE);
       }
}

testReducer.java :-

package TableDataHdfsFile;

import java.io.IOException;
import org.apache.hadoop.io.
IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;



public class testReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int i = 0;
            for (IntWritable val : values) {
                    i += val.get();
            }
            context.write(key, new IntWritable(i));
    }
}

//Create a jar and run basic program given above , we may parametrize this program  also.

// Done