How to find duplicate record using Map Reduce

How to find duplicate record using Map Reduce

Requirement

Suppose you have a data files which are having duplicate records i.e. a line of a file is occurring more than one times.  You want those records which are present in file multiple times. So the requirement is how to find the duplicate record using Map Reduce.

Components Involved

  • HDFS: Here it is used to store the input data which will get pass to the Map Reduce as an input and also store the MapReduce output.
  • Map Reduce: Here it is used to process the data/file available in HDFS and store the output to the HDFS.

Sample Data

I have taken a sample of records. I copied and pasted few records in the same sample file for the duplicate. That duplicate records  are highlighted:

 id,first_name,last_name,gender,designation,city,country
1,Thayne,Mocher,Male,Administrative Officer,Port Colborne,Canada
2,Shelly,Bulfoot,Female,Analyst Programmer,Bacong,Philippines
3,Hercule,Chorlton,Male,Operator,Al Mazzunah,Tunisia
4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine
8,Lilas,Harrowing,Female,Assistant Media Planner,Guayata,Colombia
5,Madelena,Savin,Female,Help Desk Technician,Jinjiang,China
6,Adeline,Imesson,Female,Legal Assistant,Fort Beaufort,South Africa
14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines
7,Celie,Richards,Male,Chemical Engineer,Dubiecko,Poland
8,Lilas,Harrowing,Female,Assistant Media Planner,Guayata,Colombia
4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine
9,Freida,Leivers,Female,Legal Assistant,Bangus Kulon,Indonesia
10,Celie,Dolligon,Female,Data Coordiator,Paraty,Brazil
11,Berkley,Orteaux,Male,Assistant Professor,Zinder,Niger
12,Gilburt,Minot,Male,Marketing Assistant,Hanyuan,China
13,Blaine,Treverton,Male,Research Associate,Yuankeng,China
14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines
15,Nikos,Worpole,Male,Human Resources Assistant II,Esmeralda,Cuba
16,Hercule,Richards,Male,Chemical Engineer,Dubiecko,Poland

Download the sample file:
duplicateRecordSampleData

Solution

Step 1: Input data preparation

In the first step, we will set up the data for the map reduce job. The data file should available at HDFS path.

hadoop fs -put /home/NN/HadoopRepo/MapReduce/resources/duplicateRecord /user/bdp/mapreduce

Here, in my case file is available locally at /home/NN/HadoopRepo/MapReduce/resources/duplicateRecord. I copied it to hdfs location /user/bdp/MapReduce.

Step 2: Create Maven Project

In order to write map reduce program, create a maven project. We will add the dependency for the required package.
Follow the below steps in order to create the maven project:

  • Open Eclipse
  • Create a maven project

Step 3: Resolve Dependency

Add below dependency in pom.xml file and resolve dependency using cmd:

<dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-common</artifactId>
     <version>2.7.1</version>
</dependency>
<!-- Hadoop Mapreduce Client Core -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-  core</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>${java.version}</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

Step 4: Write Mapper

Find below mapper class code:

package com.bdp.mapreduce.duplicaterecord.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DuplicateRecordMapper 
                  extends Mapper<LongWritable, Text, Text, IntWritable> {
      private static final IntWritable recordValue = new IntWritable(1);
      
      @Override
      protected void map(LongWritable key, Text value,
                  Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                  throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            if (key.get() == 0 && value.toString().contains("first_name")) {
                  return;
            } else 
                  context.write(value, recordValue); // writing line of the file as a key into context
      }
}

Step 5: Write Reducer

Now, we have to write a reducer class which will take input from mapper class. We will filter the duplicate record in this class.

package com.bdp.mapreduce.duplicaterecord.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;
/*
 * Here getting input from mapper in the format key, value
 * Key is of type Text
 * Value is of type IntWritable
 * Reducer will give only that records which are duplicate.
 */
public class DuplicateRecordReducer 
                  extends Reducer<Text, IntWritable, Text, NullWritable>{
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values,
                  Reducer<Text, IntWritable, Text, NullWritable>.Context context)
                  throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            if (Iterables.size(values) > 1)
                  context.write(key, NullWritable.get()); //Here I want only record
      }
}

Step 6: Write Driver

In this step, we will write a driver class in order to execute the mapper and reducer. Find the code below:

package com.bdp.mapreduce.duplicaterecord.driver;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.bdp.mapreduce.duplicaterecord.mapper.DuplicateRecordMapper;
import com.bdp.mapreduce.duplicaterecord.reducer.DuplicateRecordReducer;
public class DuplicateRecordDriver
                  extends Configured implements Tool{
      public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            int jobStatus = ToolRunner.run(new DuplicateRecordDriver(), args);
            System.out.println("job Status=="+ jobStatus);
      }
      public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            @SuppressWarnings("deprecation")
            Job job = new Job(getConf(), "Duplicate Record");
            job.setJarByClass(getClass());
            
            job.setMapperClass(DuplicateRecordMapper.class);
            job.setReducerClass(DuplicateRecordReducer.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            return job.waitForCompletion(true) ? 0 : 1;      
      }
}

Step 7: Package Preparation

In this step, we will create a package(.jar) of the project. Follow the below steps:

  • Open CMD
  • Head to the maven project
  • Use the command: mvn package


You can also download the package which I have built for this requirement:
MapReduceForDuplicateRecord-0.0.1-SNAPSHOT

Step 8: Execution

Once all setup is done. Next step is to execute the map reduce job. Find below execution command:

hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateRecord-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicaterecord.driver.DuplicateRecordDriver /user/bdp/mapreduce/duplicateRecord /user/bdp/mapreduce/out/duplicateRecord

Here , /user/bdp/mapreduce/duplicateRecord is my HDFS input path  and /user/bdp/MapReduce/out/duplicateRecord is MapReduce output path.

Step 9: Validation

At last, we need to validate the output of ur map reduce job. We will get the output at the path which we have given at the time of execution. In my case the output path was /user/bdp/mapreduce/out/duplicateRecord.

Use the command to check the directory:

hadoop fs -ls /user/bdp/mapreduce/out/duplicateRecord

Here, it has created a part file which contains the output. Let’s read this part file:

hadoop fs -cat /user/bdp/mapreduce/out/duplicateRecord/part-r-00000


We got all 3 duplicate records which were available in our input file.

Wrapping Up

In this post, we have seen how to find the duplicate records present in data files. Here I have taken only a few records as an input data. You can test this job on more records.

98
0

Join in hive with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in pyspark with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in spark using scala with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Java UDF to convert String to date in PIG

About Code Many times it happens like you have received data from many systems and each system operates on a ...
Read More
/ java udf, Pig, pig, pig udf, string to date, udf

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.