Saturday, 25 November 2017

JOIN TWO TABLES AND DISPLAY OUTPUT

There are two types of join in mapreduce
Joining during the Map phase.
Joining during the Reduce phase.


Join map reduce program
We need to create 4 files
1 DriverJoin.java
2 MapperJoin.java
3 MapperJoin2.java
4 ReducerJoin.java



1 DriverJoin.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class DriverJoin extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        

        if (args.length != 3) {
            System.err
                    .println("Usage :      ");
            System.exit(0);
        }
        int res = ToolRunner.run(new Configuration(), new DriverJoin(), args);
        System.exit(res);

    }

    
    public int run(String[] args) throws Exception {
        
        String source1 = args[0];
        String source2 = args[1];
        String dest = args[2];
        Configuration conf = new Configuration();
        conf.set("mapreduce.output.textoutputformat.separator", "\t"); 
        FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Multiple Jobs");

        job.setJarByClass(DriverJoin.class);
 job.setJar("DeptJoin.jar");
        Path p1 = new Path(source1);
        Path p2 = new Path(source2);
        Path out = new Path(dest);
        MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
                MapperJoin.class);
        MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
                MapperJoin2.class);
        job.setReducerClass(ReducerJoin.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        /*
         * delete if exist
         */
        if (fs.exists(out))
            fs.delete(out, true);

        TextOutputFormat.setOutputPath(job, out);
        boolean success = job.waitForCompletion(true);

        return success ? 0 : 1;
    }

}
2 MapperJoin.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class MapperJoin extends Mapper {
    Text keyEmit = new Text();
    Text valEmit = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String parts[] = line.split(",");
        keyEmit.set(parts[0]);
        valEmit.set(parts[1]);
        context.write(keyEmit, valEmit);
    }
}
3 MapperJoin2.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class MapperJoin2 extends Mapper {
    Text keyEmit = new Text();
    Text valEmit = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String parts[] = line.split(",");
        keyEmit.set(parts[0]);
        valEmit.set(parts[1]);
        context.write(keyEmit, valEmit);
    }
}
4 ReducerJoin.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class ReducerJoin extends Reducer {

    Text valEmit = new Text();
    String merge = "";

    public void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        String character = "";
        String number = "";
        for (Text value : values) {
            // ordering output
            String val = value.toString();
            char myChar = val.charAt(0);

            if (Character.isDigit(myChar)) {
                number = val;
            } else {
                character = val;
            }
        }
        merge = character + " " + number;
        valEmit.set(merge);
        context.write(key, valEmit);
    }

}

Male - Female ANALYSIS in Election Example Mapreduce


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;



  
public class Election
{
 
  public static class EleMapper extends Mapper 
  {
   //static enum EleCounters { MALE,FEMALE };
   public void map(LongWritable key,Text value,Context context) throws     IOException,InterruptedException
   {

   String fields[] = value.toString().split(",");
   String gender = fields[3];
   // String party = fields[4];

     context.write(new Text(fields[2]),new Text(fields[3]));

    /*if(gender.length()!=0)
    {
     context.getCounter(EleCounters.MALE).increment(1);
    }
    else if(!(Character.isDigit(sales.charAt(0))))
    {
     context.getCounter(EleCounters.FEMALE).increment(1);
    }
    else
    {
     context.write(new Text(country),new Text(sales));
    }*/
   }
  }
   public static class EleReduce extends Reducer 
  {
       int mc=0;
 
     int fc = 0;
       public void reduce(Text key, Iterable values, Context context) throws     IOException, InterruptedException
        {
      
      
      for (Text value : values)
      {
       
       String gender = value.toString();
      
       if(gender.equals("female"))
       {
        mc++;
       }
       else
       {
        fc++;
       }
       
      }
      
      String str = "Male Count = "+mc+"   Female Count = "+fc;
      
                                  Text str1 = new Text(str);
      context.write(key,str1);
       }
  }
  
  
  public static void main(String args[]) throws Exception
  {
     Path input = new Path(args[0]);
        Path output = new Path(args[1]);

     Configuration conf=new Configuration();
  Job job =new Job(conf);
  job.setJobName("Election");
  job.setJar("Election.jar");
  job.setJarByClass(Election.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapperClass(EleMapper.class);
  job.setReducerClass(EleReduce.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setNumReduceTasks(1);
  FileInputFormat.setInputPaths(job,input);
  FileOutputFormat.setOutputPath(job,output);
  
  System.exit(job.waitForCompletion(true)?0:1);

  }
}

Output :

administrator@dev:/usr/local/hadoop/bin$ hadoop fs -cat /dev/part-r-00000
17/09/08 12:04:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
krishna          Male Count = 1   Female Count = 0
raghavi           Male Count = 2   Female Count = 1
nitu     Male Count = 3   Female Count = 1
jay       Male Count = 4   Female Count = 1
ajay     Male Count = 5   Female Count = 1
shah    Male Count = 5   Female Count = 2
shah    Male Count = 5   Female Count = 3
sharma           Male Count = 5   Female Count = 4


Search the word and Get its Position


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;



  
public class ws
{
 
  public static class wsMapper extends Mapper 
  {
   
   String keyword;
    String found="";
   public void setup(Context context) throws IOException,InterruptedException
   {
    Configuration conf = context.getConfiguration();
    keyword = conf.get("keyword");   
   } 
   public void map(LongWritable key,Text value,Context context) throws     IOException,InterruptedException
   {
    Integer WordPos=0;
 //pos++;
    if(value.toString().contains(keyword))
    {
     WordPos = value.find(keyword);
     found="is Found";
   
    }
    else
    {
     found="is not found";
    }
    context.write(new Text("Pos ="+""+WordPos.toString()),new Text(keyword+" "+found));
   }
  }
   public static class wsReduce extends Reducer 
  {
//       int bjp=0;
 
//     int cong = 0;
       public void reduce(Text key, Text values, Context context) throws     IOException, InterruptedException
        {
      
      
     
      context.write(key,values);
       }
  }
  
  
  public static void main(String args[]) throws Exception
  {
     Path input = new Path(args[0]);
        Path output = new Path(args[1]);

      Configuration conf=new Configuration();
  Job job =new Job(conf);
  job.setJobName("ws");
  job.setJar("ws.jar");
  job.setJarByClass(ws.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapperClass(wsMapper.class);
  job.setReducerClass(wsReduce.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setNumReduceTasks(1);
  job.getConfiguration().set("keyword",args[2]);
  FileInputFormat.setInputPaths(job,input);
  FileOutputFormat.setOutputPath(job,output);
  
  System.exit(job.waitForCompletion(true)?0:1);

  }
}

Output :


administrator@dev:/usr/local/hadoop/bin$ hadoop fs -cat /mir/part-r-00000
17/09/08 14:43:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Pos =0           MIR is not found
Pos =0           MIR is not found
Pos =0           MIR is not found
Pos =0           MIR is not found
Pos =0           MIR is not found
Pos =0           MIR is not found
Pos =7           MIR is Found

To find the MISSING and INVALID Data from the Given Data Set


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;




public class CountCounters
{
 
  public static class CountMapper extends Mapper 
  {
  static enum SalesCounters { MISSING,INVALID };
   public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
   {

   String fields[] = value.toString().split(",");
   String country = fields[2];
   String sales = fields[3];

    if(country.length()==0)
    {
     context.getCounter(SalesCounters.MISSING).increment(1);
    }
    else if(!(Character.isDigit(sales.charAt(0))))
    {
       context.getCounter(SalesCounters.INVALID).increment(1);
    }
    else
    {
    context.write(new Text(country),new Text(sales));
    }
   }
  }
   public static class CountReduce extends Reducer 
  {
     
       public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException
        {
         for(Text details:values)
         {
          context.write(NullWritable.get(),details);
          }
       }
    }
  
  
  public static void main(String args[]) throws Exception
  {
     Path input = new Path(args[0]);
     Path output = new Path(args[1]);

     Configuration conf=new Configuration();
  Job job =new Job(conf);
  job.setJobName("CountCounters");
  job.setJar("DataCounter4.jar");
  job.setJarByClass(CountCounters.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapperClass(CountMapper.class);
  job.setReducerClass(CountReduce.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setNumReduceTasks(1);
  FileInputFormat.setInputPaths(job,input);
  FileOutputFormat.setOutputPath(job,output);
  
  System.exit(job.waitForCompletion(true)?0:1);

  Counters counter = job.getCounters();
System.out.printf("MISSIng %d , invalid %d",counter.findCounter(CountMapper.SalesCounters.MISSING).getValue(),counter.findCounter(CountMapper.SalesCounters.INVALID).getValue());
 
  }
}

Output :

hadoop jar DataCounter4.jar CountCounters /Sales1/SalesData.txt /New11

hadoop fs -cat /New11/part-r-00000
17/09/06 14:20:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3Cr
2Cr


Sunday, 19 November 2017

Search for Specific Keyword form a File Map Reduce programe Hadoop

WordSearch.java



import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;


public class WordSearch
{
 public static void main (String args[]) throws Exception
 {
  Path input = new Path(args[0]);
  Path output = new Path(args[1]);
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJobName("Search");
  job.setJar("search.jar");
  job.setJarByClass(WordSearch.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapperClass(WordSearchMapper.class);
  job.setReducerClass(WordSearchReducer.class);
  
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  job.setNumReduceTasks(1);
  job.getConfiguration().set("keyword",args[2]);

  FileInputFormat.setInputPaths(job,input);
  FileOutputFormat.setOutputPath(job,output);

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}




WordSearchMapper.java




import java.io.IOException;
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WordSearchMapper extends Mapper
{
 static String keyword;
 static int pos=0;
 public void setup(Context context) throws IOException,InterruptedException
 {
  Configuration conf = context.getConfiguration();
  keyword = conf.get("keyword");   
 } 
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException
{
 InputSplit i = context.getInputSplit();
 FileSplit f = (FileSplit) i;
 String fileName = f.getPath().getName();
 Integer WordPos;
 pos++;
 if(value.toString().contains(keyword))
 {
  WordPos = value.find(keyword);
  context.write(value,new Text(fileName + "," + new IntWritable(pos).toString() + "," + WordPos.toString()));
 }
}  
}  



WordSearchReducer.java




import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordSearchReducer extends Reducer
{
public void reduce(Text key,Text value,Context context)throws IOException,InterruptedException
 {
  context.write(key,value);
 }

}




Output :


hadoop jar search.jar WordSearch /SampleDir/student.txt /NewOpt NAME

hadoop fs -ls /NewOpt
17/08/26 15:26:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 administrator supergroup          0 2017-08-26 15:25 /NewOpt/_SUCCESS
-rw-r--r--   1 administrator supergroup         56 2017-08-26 15:25 /NewOpt/part-r-00000

administrator@ravi:/usr/local/hadoop/bin$ hadoop fs -cat /NewOpt/part-r-00000
17/08/26 15:26:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

NAME   student.txt,7,0
NAME : dev patel             student.txt,1,0


Saturday, 18 November 2017

Program to count the Words in the File Hadoop Map Reduce

The main that shows how to specify that you want to run this class and its mapper and reducer within the hadoop framework, via an hadoop class called the ToolRunner.
The run function demonstration how you set up parameters for the job. This is a typical minimum set of job parameters. You may find that you need others, such as requsting how many map tasks and redcue tasks to use. See the hadoop documentation for the Job class for more information.
The inner class called Map. It extends the Mapper class defined in the hadoop API. Within the < and > brackets are listed the data types for the input key and value and the produced key and value. You override a function called map that defines the work of the mapper function.
The inner class called Reduce. It extends the Reducer class defined in the hadoop API. Within the < and > brackets are listed the data types for the input key and value and the emitted key and value. You override a function called reduce that defines the work of the reducer function.

Wc.java


import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.*;

public class wc extends Configured implements Tool {

  public static void main(String args[]) throws Exception {
    int res = ToolRunner.run(new wc(), args);
    System.exit(res);
  }

  public int run(String[] args) throws Exception {
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration conf = getConf();
    Job job = new Job(conf, this.getClass().toString());

    FileInputFormat.setInputPaths(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    job.setJobName("wc");
    job.setJarByClass(wc.class);
    job.setJar("wc.jar"); 
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);

    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static class Map extends Mapper
 {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    
    public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException
    {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) 
 {
        word.set(tokenizer.nextToken());
        context.write(word, one);
       }
    }
  }

  public static class Reduce extends Reducer {

    
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException
     {
      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }

      context.write(key, new IntWritable(sum));
    }
  }

}



Output :

hadoop fs -ls /Output1
17/08/19 16:21:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 administrator supergroup          0 2017-08-19 16:19 /Output1/_SUCCESS
-rw-r--r--   1 administrator supergroup         70 2017-08-19 16:19 /Output1/part-r-00000
administrator@ravi:/usr/local/hadoop/bin$ hadoop fs -cat /Output1/part-r-00000
17/08/19 16:22:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
:               3
comp     1
it             1
NAME   2
NO         1
ROLL      1
SEM-3   1
sharma 1
STD        1
Shweta 1

Saturday, 11 November 2017

Program of Maximum/Minimum Temperature in Weather Dataset in Hadoop Mapreduce Programme

Program of Maximum/Minimum Temperature in weather Dataset

Download or create sample weather Dataset.

We have to create 3 java file called
MapReduce is based on set of key value pairs. So first we have to decide on the types for the key/value pairs for the input.
Map Phase: The input for Map segment is set of weather dataset. The types of input key value pairs are LongWritable and Text and the types of output key value pairs are Text and IntWritable. Each Map task extracts the temperature data from the given year file. The output of the map segment is set of key value pairs. Set of keys are the years. Values are the temperature of each year.

Reduce Phase: Reduce phase obtains all the values linked with a particular key. That is all the temperature values belong to a particular year is fed to a same reducer. Then each reducer finds the highest recorded temperature for each year. The types of output key value pairs in Map phase is same for the types of input key value pairs in reduce phase (Text and IntWritable). The types of output key value pairs in reduce phase is also Text and IntWritable.


maxTemperature.java,
maxTemperatureMapper.java,
maxTemperatureReducer.java

maxTemperature.java


import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;


public class MaxTemperature
{
 public static void main (String args[]) throws Exception
 {
  if (args.length!=2)
  {
  System.err.println("Use : MaxTemperature -inputpath- -outputpath-");
  System.exit(-1);
  }
  Configuration conf = new Configuration ();
  Job job = new Job(conf,"MaxTemperature");
  job.setJarByClass(MaxTemperature.class);
  job.setJar("MaxTemperature.jar");
  
  //Job job = new Job(conf, "Temperature");
  //job.setJarByClass(MaxTemperature.class);
  job.setJobName("Max Temperature");
  job.setJobName("Max Temperature");
  
  FileInputFormat.addInputPath(job, new Path (args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
  job.setMapperClass(MaxTempMapper.class);
  job.setReducerClass(MaxTempReducer.class);


  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}


maxTemperatureMapper.java


import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTempMapper extends Mapper
{
 
  public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException
  {
   String line=value.toString();
   String year=line.substring(0,4);
   int airtemp;
   if(line.charAt(47)=='+')
    airtemp=Integer.parseInt(line.substring(48,50));
   else
    airtemp=Integer.parseInt(line.substring(47,49));
    
   context.write(new Text(year),new  IntWritable(airtemp));


   
 
  }

}


maxTemperatureReducer.java


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.lib.input.*;

public class MaxTempReducer extends Reducer  
{
 public void reduce(Text key, Iterable values, Context context)
  throws IOException, InterruptedException
 {
  int maxValue = Integer.MIN_VALUE;
  for ( IntWritable value : values)
  {
   maxValue = Math.max (maxValue, value.get());
  }
  context.write (key, new IntWritable(maxValue));
 }
}



When You compile programme map ane reduce job will be complate 100% and successfull complition,
Output  :

hadoop jar MaxTemp.jar MaxTemp /Sample.txt /Out3

 hadoop fs -cat /Out3/part-r-00000
16/08/05 14:24:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2010   0
2011   6
2012   5
2013   6
2014   11
2015   23
2016   24