Create Hbase Table
create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'
Create Java project with 3 files and include all Hbase required Jars:-
testDriver.java :
package TableDataHdfsFile;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.commons.io.
import org.apache.hadoop.conf.
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.
import org.apache.hadoop.hbase.
import org.apache.hadoop.hbase.
import org.apache.hadoop.io.
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.
import org.apache.hadoop.mapreduce.
public class testDriver
{
static String FilePath = "/home/admin/Test/
public static void main(String[] args) throws Exception
{
File file = new File(FilePath);
if (file.exists())
{
FileUtils.cleanDirectory(file)
FileUtils.forceDelete(file); //delete directory
}
FileSystem fs = FileSystem.get(config);
if (fs.exists( new Path(FilePath)))
{
fs.delete(new Path(FilePath), true); // delete file, true for recursive
}
Configuration config = HBaseConfiguration.create();
Job job = Job.getInstance(config);
job.setJobName("
job.setJarByClass(testDriver.
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.
"test1", // input table
scan, // Scan instance to control CF and attribute selection
testMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
System.out.println(job.
job.setReducerClass(
job.setNumReduceTasks(1); // at least one, adjust as required
FileOutputFormat.
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}
testMapper.java :-
package TableDataHdfsFile;
import java.io.IOException;
import org.apache.hadoop.hbase. client.Result;
import org.apache.hadoop.hbase.io. ImmutableBytesWritable;
import org.apache.hadoop.hbase. mapreduce.TableMapper;
import org.apache.hadoop.hbase.util. Bytes;
import org.apache.hadoop.io. IntWritable;
import org.apache.hadoop.io.Text;
public class testMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
// byte[] bSales = columns.getValue(Bytes. toBytes("cf1"), Bytes.toBytes("sales"));
String val = new String(value.getValue(Bytes. toBytes("cf1"), Bytes.toBytes("sales")));
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
testReducer.java :-
package TableDataHdfsFile;
import java.io.IOException;
import org.apache.hadoop.hbase.
import org.apache.hadoop.hbase.io.
import org.apache.hadoop.hbase.
import org.apache.hadoop.hbase.util.
import org.apache.hadoop.io.
import org.apache.hadoop.io.Text;
public class testMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
// byte[] bSales = columns.getValue(Bytes.
String val = new String(value.getValue(Bytes.
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
testReducer.java :-
package TableDataHdfsFile;
import java.io.IOException;
import org.apache.hadoop.io.
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.
public class testReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
context.write(key, new IntWritable(i));
}
}
//Create a jar and run basic program given above , we may parametrize this program also.
// Done
No comments:
Post a Comment