Requirement
Suppose you have a data files which are having duplicate records i.e. a line of a file is occurring more than one times. You want those records which are present multiple times in the file. So the requirement is to find the duplicate record using Map Reduce.
Components Involved
- HDFS: To store the input data which will be passed to the Map Reduce as an input and also store the map-reduce output.
- Map Reduce: To process the data/file available in HDFS and store the output to the HDFS.
Sample Data
I have taken a sample of records. I copied and pasted a few records in the same sample file for making the duplicate record.
id,first_name,last_name,gender,designation,city,country 1,Thayne,Mocher,Male,Administrative Officer,Port Colborne,Canada 2,Shelly,Bulfoot,Female,Analyst Programmer,Bacong,Philippines 3,Hercule,Chorlton,Male,Operator,Al Mazzunah,Tunisia 4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine 8,Lilas,Harrowing,Female,Assistant Media Planner,Guayata,Colombia 5,Madelena,Savin,Female,Help Desk Technician,Jinjiang,China 6,Adeline,Imesson,Female,Legal Assistant,Fort Beaufort,South Africa 14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines 7,Celie,Richards,Male,Chemical Engineer,Dubiecko,Poland 8,Lilas,Harrowing,Female,Assistant Media Planner,Guayata,Colombia 4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine 9,Freida,Leivers,Female,Legal Assistant,Bangus Kulon,Indonesia 10,Celie,Dolligon,Female,Data Coordiator,Paraty,Brazil 11,Berkley,Orteaux,Male,Assistant Professor,Zinder,Niger 12,Gilburt,Minot,Male,Marketing Assistant,Hanyuan,China 13,Blaine,Treverton,Male,Research Associate,Yuankeng,China 14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines 15,Nikos,Worpole,Male,Human Resources Assistant II,Esmeralda,Cuba 16,Hercule,Richards,Male,Chemical Engineer,Dubiecko,Poland
Download the sample file:
duplicateRecordSampleData
Solution
Step 1: Input data preparation
In the first step, we will set up the data for the map reduce job. The data file should available at HDFS path.
hadoop fs -put /home/NN/HadoopRepo/MapReduce/resources/duplicateRecord /user/bdp/mapreduce
Here, in my case file is available locally at /home/NN/HadoopRepo/MapReduce/resources/duplicateRecord. I copied it to hdfs location /user/bdp/MapReduce.
Step 2: Create Maven Project
In order to write map reduce program, create a maven project. We will add the dependency for the required package.
Follow the below steps in order to create the maven project:
- Open Eclipse
- Create a maven project
Step 3: Resolve Dependency
Add below dependency in the pom.xml file and resolve dependency using cmd:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> <!-- Hadoop Mapreduce Client Core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client- core</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>${java.version}</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency>
Step 4: Write Mapper
Find below mapper class code:
package com.bdp.mapreduce.duplicaterecord.mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DuplicateRecordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final IntWritable recordValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub if (key.get() == 0 && value.toString().contains("first_name")) { return; } else context.write(value, recordValue); // writing line of the file as a key into context } }
Step 5: Write Reducer
Now, we have to write a reducer class which will take input from the mapper class. We will filter the duplicate record in this class.
package com.bdp.mapreduce.duplicaterecord.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; /* * Here getting input from mapper in the format key, value * Key is of type Text * Value is of type IntWritable * Reducer will give only that records which are duplicate. */ public class DuplicateRecordReducer extends Reducer<Text, IntWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub if (Iterables.size(values) > 1) context.write(key, NullWritable.get()); //Here I want only record } }
Step 6: Write Driver
In this step, we will write a driver class in order to execute the mapper and reducer. Find the code below:
package com.bdp.mapreduce.duplicaterecord.driver; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.bdp.mapreduce.duplicaterecord.mapper.DuplicateRecordMapper; import com.bdp.mapreduce.duplicaterecord.reducer.DuplicateRecordReducer; public class DuplicateRecordDriver extends Configured implements Tool{ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int jobStatus = ToolRunner.run(new DuplicateRecordDriver(), args); System.out.println("job Status=="+ jobStatus); } public int run(String[] args) throws Exception { // TODO Auto-generated method stub @SuppressWarnings("deprecation") Job job = new Job(getConf(), "Duplicate Record"); job.setJarByClass(getClass()); job.setMapperClass(DuplicateRecordMapper.class); job.setReducerClass(DuplicateRecordReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
Step 7: Package Preparation
In this step, we will create a package(.jar) of the project. Follow the below steps:
- Open CMD
- Head to the maven project
- Use the command: mvn package
You can also download the package which I have built for this requirement:
MapReduceForDuplicateRecord-0.0.1-SNAPSHOT
Step 8: Execution
Once all setup is done. Next step is to execute the map reduce job. Find below execution command:
hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateRecord-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicaterecord.driver.DuplicateRecordDriver /user/bdp/mapreduce/duplicateRecord /user/bdp/mapreduce/out/duplicateRecord
Here , /user/bdp/mapreduce/duplicateRecord is my HDFS input path and /user/bdp/MapReduce/out/duplicateRecord is MapReduce output path.
Step 9: Validation
At last, we need to validate the output of the map reduce job. We will get the output at the path which we have given at the time of execution. In my case the output path was /user/bdp/mapreduce/out/duplicateRecord.
Use the command to check the directory:
hadoop fs -ls /user/bdp/mapreduce/out/duplicateRecord
Here, it has created a part file which contains the output. Let’s read this part file:
hadoop fs -cat /user/bdp/mapreduce/out/duplicateRecord/part-r-00000
We got all 3 duplicate records which were available in our input file.
Wrapping Up
In this post, we have seen how to find the duplicate records present in data files. Here I have taken only a few records as input data. You can test this job on more number of records.