Reduce side join in hadoop : Data analyses from different types of data sources
Posted By : Rohan Jain | 24-Nov-2014
Why we need to do data join ??
Consider an example we have two different data sources, our first data source has data of the form Person UID,Name, and total friends.
1,PersonA,300
2,PersonB,200
3,PersonC,250
And our second data source has data of form Person ID, current city, hometown , age, presentYear.
1,Gurgaon,Agra,22,2013
2,Delhi,Agra,21,2013
2,Noida,Agra,21,2014
Now we need the data from two different types of data source one with basic info other with socialinfo(kind of), and present the result in combined form.
So to generate the output based on different data sources that are related in some way we can generate the combined result on the basis of some parameter , say (Person UID), and generate output like -
1 PersonA,300,Gurgaon,Agra,22,2013
To acheive this, Hadoop has a package called datajoin that works as a generic framework for data joining.
What is Reduce side joins
Named so, because done on Reduce side.
New terminologies
->data source , tag , and group key
Data Source -> input file/files
Tags -> The MapReduce paradigm calls for processing each record one at a time in a stateless
manner. If we want some state information to persist, we have to tag the record with
such state.
from example : To associate record with socialinfo or basicinfo tag.
Group key -> it is a join key (as in relational database), it is Person UID in our case.
How it Works
For example,
Consider the record
2,PersonB,200
The record is form the socialinfo file, map() will output a key/value pair where the key is "2" , the Person UID that will be used to join with records from the socialinfo file. The value output by map() is the entire record wrapped by a tag "socialinfo".
Consider record form other data source
2,Delhi,Agra,21,2013
The record is form the basicinfo file, map() will output a key/value pair where the key is "2" , the Perosn UID that will be used to join with records from the basicinfo file. The value output by map() is the entire record wrapped by a tag "basicinfo".
Now the function reduce() will unwrap the package to get the original record and the data source of the record by its tag. We see that for group key (Person UIDs) "1" and "2" will gets two values. One value is tagged with "socialinfo" and the other value is tagged with "basicinfo"
The function reduce() will take its input and do a full cross-product on the values. Reduce() creates all combinations of the values with the constraint that a combination will not be tagged more than once.
Map Output
In out case our join key "2" has three values tagged differntly two with socialinfo and one with basicinfo.
Cross product creates two combinations
2 PersonB,200,Noida,Agra,21,2014
2 PersonB,200,Delhi,Agra,21,2013
and only one combination with join key 1 as record.
1 PersonA,300,Gurgaon,Agra,22,2013
Note: In cases where reduce() sees values of distinct tags,the cross-product is the original set of values.
Now It feeds each combination from the cross-product into a function called combine() .
The combine() function decides whether the whole operation is an inner join , outer join , or another type of join.
It it is an innner join then our record with key 3 from basicinfo will be dropped.
3,PersonC,250
Implementaion of exercise. (You can clone the project using https://github.com/roanjain/reduce-side-join-hadoop )
You can find datajoin package jar in /share/hadoop/tool/lib
package com.reducesidejoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.reducesidejoin.ReduceSideJoin.Map.TaggedWritable;
/**
* @author Rohan
* This reduce side join job uses the old hadoop API, as there are numerous incompatibility issues with the new API.
*/
public class ReduceSideJoin extends Configured implements Tool {
public static class Map extends DataJoinMapperBase {
public static class TaggedWritable extends TaggedMapOutput{
private Writable data;
/**
* Need to Initialize the empty constructor as we have declared a args constructor, otherwise you will
* get a NoSuchMethodException error.
*/
public TaggedWritable(){
this.tag = new Text();
}
public TaggedWritable(Writable data){
this.tag = new Text(" ");
this.data = data;
}
public Writable getData() {
return data;
}
public void setData(Writable data) {
this.data = data;
}
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null || !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}
}
/**
* Method is used to generate the tag for different data source using input file name
*/
public Text generateInputTag(String inputFile){
String dataSource = inputFile;
return new Text(dataSource);
}
/**
* Method is used to generate the group/join key for different records
*/
protected Text generateGroupKey(TaggedMapOutput aRecord){
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];
return new Text(groupKey);
}
/**
* Since our values are of type custom writable i.e., taggedWritable cast the value as this and
* return the result with input tag attached to each value
*/
protected TaggedMapOutput generateTaggedMapOutput(Object value){
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
/**
* For each record in data source file it will call the map function again and pass the record
* to associate a tag attribute with each record of that data source using it's input file name.
* So all records of a data source file will get the same tag.
* Map function which outputs the key/value pair where key is the JoinKey
* and values are record of type TaggedWritable having an tag attribute associated with them.
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
TaggedMapOutput aRecord = generateTaggedMapOutput(value);
Text groupKey = generateGroupKey(aRecord);
context.write(groupKey, aRecord);
}
}
/**
* The reducer function will only have a single combine method,
* which takes the object args of tag list and values wrapped with tags types.
*
*/
public static class Reduce extends DataJoinReducerBase {
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if(tags.length<2) return null;
String joinedStr = "";
for(int i=0; i<values.length;i++){ if(i>0) joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(",",2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ReduceSideJoin(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf,ReduceSideJoin.class);
job.setJarByClass(ReduceSideJoin.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
JobClient.runJob(job);
return 0;
}
}
Thanks
Cookies are important to the proper functioning of a site. To improve your experience, we use cookies to remember log-in details and provide secure log-in, collect statistics to optimize site functionality, and deliver content tailored to your interests. Click Agree and Proceed to accept cookies and go directly to the site or click on View Cookie Settings to see detailed descriptions of the types of cookies and choose whether to accept certain cookies while on the site.
About Author
Rohan Jain
Rohan is a bright and experienced web app developer with expertise in Groovy and Grails development.