Saturday, 25 November 2017

To find the MISSING and INVALID Data from the Given Data Set


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;




public class CountCounters
{
 
  public static class CountMapper extends Mapper 
  {
  static enum SalesCounters { MISSING,INVALID };
   public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
   {

   String fields[] = value.toString().split(",");
   String country = fields[2];
   String sales = fields[3];

    if(country.length()==0)
    {
     context.getCounter(SalesCounters.MISSING).increment(1);
    }
    else if(!(Character.isDigit(sales.charAt(0))))
    {
       context.getCounter(SalesCounters.INVALID).increment(1);
    }
    else
    {
    context.write(new Text(country),new Text(sales));
    }
   }
  }
   public static class CountReduce extends Reducer 
  {
     
       public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException
        {
         for(Text details:values)
         {
          context.write(NullWritable.get(),details);
          }
       }
    }
  
  
  public static void main(String args[]) throws Exception
  {
     Path input = new Path(args[0]);
     Path output = new Path(args[1]);

     Configuration conf=new Configuration();
  Job job =new Job(conf);
  job.setJobName("CountCounters");
  job.setJar("DataCounter4.jar");
  job.setJarByClass(CountCounters.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapperClass(CountMapper.class);
  job.setReducerClass(CountReduce.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setNumReduceTasks(1);
  FileInputFormat.setInputPaths(job,input);
  FileOutputFormat.setOutputPath(job,output);
  
  System.exit(job.waitForCompletion(true)?0:1);

  Counters counter = job.getCounters();
System.out.printf("MISSIng %d , invalid %d",counter.findCounter(CountMapper.SalesCounters.MISSING).getValue(),counter.findCounter(CountMapper.SalesCounters.INVALID).getValue());
 
  }
}

Output :

hadoop jar DataCounter4.jar CountCounters /Sales1/SalesData.txt /New11

hadoop fs -cat /New11/part-r-00000
17/09/06 14:20:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3Cr
2Cr


0 comments:

Post a Comment