Saturday, 25 November 2017

JOIN TWO TABLES AND DISPLAY OUTPUT

There are two types of join in mapreduce
Joining during the Map phase.
Joining during the Reduce phase.


Join map reduce program
We need to create 4 files
1 DriverJoin.java
2 MapperJoin.java
3 MapperJoin2.java
4 ReducerJoin.java



1 DriverJoin.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class DriverJoin extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        

        if (args.length != 3) {
            System.err
                    .println("Usage :      ");
            System.exit(0);
        }
        int res = ToolRunner.run(new Configuration(), new DriverJoin(), args);
        System.exit(res);

    }

    
    public int run(String[] args) throws Exception {
        
        String source1 = args[0];
        String source2 = args[1];
        String dest = args[2];
        Configuration conf = new Configuration();
        conf.set("mapreduce.output.textoutputformat.separator", "\t"); 
        FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Multiple Jobs");

        job.setJarByClass(DriverJoin.class);
 job.setJar("DeptJoin.jar");
        Path p1 = new Path(source1);
        Path p2 = new Path(source2);
        Path out = new Path(dest);
        MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
                MapperJoin.class);
        MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
                MapperJoin2.class);
        job.setReducerClass(ReducerJoin.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        /*
         * delete if exist
         */
        if (fs.exists(out))
            fs.delete(out, true);

        TextOutputFormat.setOutputPath(job, out);
        boolean success = job.waitForCompletion(true);

        return success ? 0 : 1;
    }

}
2 MapperJoin.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class MapperJoin extends Mapper {
    Text keyEmit = new Text();
    Text valEmit = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String parts[] = line.split(",");
        keyEmit.set(parts[0]);
        valEmit.set(parts[1]);
        context.write(keyEmit, valEmit);
    }
}
3 MapperJoin2.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class MapperJoin2 extends Mapper {
    Text keyEmit = new Text();
    Text valEmit = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String parts[] = line.split(",");
        keyEmit.set(parts[0]);
        valEmit.set(parts[1]);
        context.write(keyEmit, valEmit);
    }
}
4 ReducerJoin.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class ReducerJoin extends Reducer {

    Text valEmit = new Text();
    String merge = "";

    public void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        String character = "";
        String number = "";
        for (Text value : values) {
            // ordering output
            String val = value.toString();
            char myChar = val.charAt(0);

            if (Character.isDigit(myChar)) {
                number = val;
            } else {
                character = val;
            }
        }
        merge = character + " " + number;
        valEmit.set(merge);
        context.write(key, valEmit);
    }

}

0 comments:

Post a Comment