How to find duplicate value using Map Reduce

Requirement

Suppose you get data files which are having user’s basic information like first name, last name, designation, city etc. These basic details are separated by ‘,’ delimiter. Now, the requirement has come to find out all the duplicate value of any field of information. So, here the requirement is how to find duplicate value using Map Reduce.

 

Components Involved

  • HDFS: Here it is used to store the input data which will get pass to the Map Reduce as an input and also store the map-reduce output.
  • Map Reduce: Here it is used to process the data/file available in HDFS and store the output to the HDFS.

Sample Data

Below are the sample data for the exercise:

sampledataDuplicate
 
  1. id,first_name,last_name,gender,designation,city,country
  2. 1,Thayne,Mocher,Male,Administrative Officer,Port Colborne,Canada
  3. 2,Shelly,Bulfoot,Female,Analyst Programmer,Bacong,Philippines
  4. 3,Hercule,Chorlton,Male,Operator,Al Mazzunah,Tunisia
  5. 4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine
  6. 5,Madelena,Savin,Female,Help Desk Technician,Jinjiang,China
  7. 6,Adeline,Imesson,Female,Legal Assistant,Fort Beaufort,South Africa
  8. 7,Celie,Richards,Male,Chemical Engineer,Dubiecko,Poland
  9. 8,Lilas,Harrowing,Female,Assistant Media Planner,Guayatá,Colombia
  10. 9,Freida,Leivers,Female,Legal Assistant,Bangus Kulon,Indonesia
  11. 10,Celie,Dolligon,Female,Data Coordiator,Paraty,Brazil
  12. 11,Berkley,Orteaux,Male,Assistant Professor,Zinder,Niger
  13. 12,Gilburt,Minot,Male,Marketing Assistant,Hanyuan,China
  14. 13,Blaine,Treverton,Male,Research Associate,Yuankeng,China
  15. 14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines
  16. 15,Nikos,Worpole,Male,Human Resources Assistant II,Esmeralda,Cuba
  17. 16,Hercule,Richards,Male,Chemical Engineer,Dubiecko,Poland

In the sample data, all the fields are the comma(,) separated. We have to find out only repeated value in a column. Let’s take the first_name field. So the value repeated in this field will be output. If you see the sample data, all the repeatable values are highlighted in the first name field.

Solution

In the solution, we will write a map reduce program. This will be written in Java programming language. The MapReduce will take an input which is the data file and write the output at the HDFS path. The solution will have multiple steps. Let’s go through all the steps sequentially.

Step 1: Input Data Preparation

The first step is to prepare the input data for the map reduce. This will be input for the map reduce. Let’s keep the sample data into a file and keep it at the local path. In my case, file name is sampledataForDuplicate” and local path is /home/NN/HadoopRepo/MapReduce/resources/duplicateValue”.

We need to move this data file to an HDFS location which will be input path for the map reduce.

copyToHdfs
 
  1. hadoop fs -put /home/NN/HadoopRepo/MapReduce/resources/duplicateValue /user/bdp/mapreduce

It will copy the data file from local to HDFS location.

Step 2: Create Maven Project

In order to write map reduce program, create a maven project. We will add a dependency for the required package.

Follow the below steps in order to create the maven project:

  • Open Eclipse
  • Create a maven project

Step 3: Resolve Dependency

Add dependency in pom.XML file and resolve dependency using cmd:

pom
 
  1. <dependency>
  2.       <groupId>org.apache.hadoop</groupId>
  3.       <artifactId>hadoop-common</artifactId>
  4.       <version>2.7.1</version>
  5. </dependency>
  6. <!-- Hadoop Mapreduce Client Core -->
  7. <dependency>
  8.       <groupId>org.apache.hadoop</groupId>
  9.       <artifactId>hadoop-mapreduce-client-core</artifactId>
  10.       <version>2.7.1</version>
  11. </dependency>
  12. <dependency>
  13.        <groupId>jdk.tools</groupId>
  14.        <artifactId>jdk.tools</artifactId>
  15.        <version>${java.version}</version>
  16.        <scope>system</scope>
  17.        <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
  18. </dependency>

Step 4: Write Mapper

Once you are done with all above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.

DuplicateValueMapper
 
  1. package com.bdp.mapreduce.duplicate.mapper;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. public class DuplicateValueMapper
  8.       extends Mapper<LongWritable, Text, Text, IntWritable>{
  9.       private static final IntWritable one = new IntWritable(1);
  10.       
  11.       @Override
  12.       protected void map(LongWritable key, Text value,
  13.             Mapper<LongWritable, Text, Text, IntWritable>.Context context)
  14.             throws IOException, InterruptedException {
  15.             // TODO Auto-generated method stub
  16.             //Skipping the header of the input
  17.             if (key.get() == 0 && value.toString().contains("first_name")) {
  18.                   return;
  19.             }
  20.             else {
  21.                   String values[] = value.toString().split(",");
  22.                   context.write(new Text(values[1]), one); //Writing first_name value as a key
  23.             }
  24.       }
  25. }

Here, in the mapper, I am taking first_name as a key because I want to find out the duplicate record in the first_name field. We can change it as per the different field.

Step 5: Write Reducer

In this step, we will take the mapper data as an input and process it. The actual logic has been written here.

Find the code below:

DuplicateValueReducer
 
  1. package com.bdp.mapreduce.duplicate.reducer;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7. import com.google.common.collect.Iterables;
  8. /*
  9.  * This reducer will get mapper data as input and return only key that is duplicate value.
  10.  *
  11.  */
  12. public class DuplicateValueReducer
  13.       extends Reducer<Text, IntWritable, Text, NullWritable>{
  14.       @Override
  15.       protected void reduce(Text key, Iterable<IntWritable> values,
  16.             Reducer<Text, IntWritable, Text, NullWritable>.Context context)
  17.             throws IOException, InterruptedException {
  18.             // TODO Auto-generated method stub
  19.             //Check if the key has duplicate value
  20.             if (Iterables.size(values) > 1) {
  21.                   context.write(key, NullWritable.get());
  22.             }
  23.       }
  24. }

In the reducer, we are writing only those value which is more than 1.

Step 6: Write Driver

In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:

DuplicateValueDriver
 
  1. package com.bdp.mapreduce.duplicate.driver;
  2. import org.apache.hadoop.conf.Configured;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import org.apache.hadoop.util.Tool;
  10. import org.apache.hadoop.util.ToolRunner;
  11. import com.bdp.mapreduce.duplicate.mapper.DuplicateValueMapper;
  12. import com.bdp.mapreduce.duplicate.reducer.DuplicateValueReducer;
  13. public class DuplicateValueDriver
  14.       extends Configured implements Tool{
  15.       public int run(String[] arg0) throws Exception {
  16.             // TODO Auto-generated method stub
  17.             @SuppressWarnings("deprecation")
  18.             Job job = new Job(getConf(), "Duplicate value");
  19.             job.setJarByClass(getClass());
  20.             
  21.             job.setMapperClass(DuplicateValueMapper.class);
  22.             job.setReducerClass(DuplicateValueReducer.class);
  23.             
  24.             job.setMapOutputKeyClass(Text.class);
  25.             job.setMapOutputValueClass(IntWritable.class);
  26.             
  27.             FileInputFormat.addInputPath(job, new Path(arg0[0]));
  28.             FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
  29.             
  30.             return job.waitForCompletion(true) ? 0 : 1;            
  31.       }
  32.       
  33.       public static void main(String[] args) throws Exception {
  34.             int jobStatus = ToolRunner.run(new DuplicateValueDriver(), args);
  35.             System.out.println(jobStatus);
  36.       }
  37. }

Step 7: Package Preparation

In this step, we will create a package(.jar) of the project. Follow the below steps:

  • Open CMD
  • Head to the maven project
  • Use command:

mvn package

It will create a .jar file under target directory. Copy this created jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce“.

Step 8: Execution

All the setup has been done. Let’s execute the job and validate output. In order to execute the map reduce, use below command:

Format:

hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>

Ex:

hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output

console
 
  1. [root@NN hadoop-2.6.0]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output
  2. 17/07/20 23:50:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. 17/07/20 23:50:23 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
  4. 17/07/20 23:50:23 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
  5. 17/07/20 23:50:24 INFO input.FileInputFormat: Total input paths to process : 1
  6. 17/07/20 23:50:25 INFO mapreduce.JobSubmitter: number of splits:1
  7. 17/07/20 23:50:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local551429012_0001
  8. 17/07/20 23:50:28 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
  9. 17/07/20 23:50:28 INFO mapreduce.Job: Running job: job_local551429012_0001
  10. 17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter set in config null
  11. 17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
  12. 17/07/20 23:50:28 INFO mapred.LocalJobRunner: Waiting for map tasks
  13. 17/07/20 23:50:28 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_m_000000_0
  14. 17/07/20 23:50:29 INFO mapreduce.Job: Job job_local551429012_0001 running in uber mode : false
  15. 17/07/20 23:50:29 INFO mapreduce.Job:  map 0% reduce 0%
  16. 17/07/20 23:50:29 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
  17. 17/07/20 23:50:29 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/sampledataForDuplicate:0+1026
  18. 17/07/20 23:50:29 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
  19. 17/07/20 23:50:29 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
  20. 17/07/20 23:50:29 INFO mapred.MapTask: soft limit at 83886080
  21. 17/07/20 23:50:29 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
  22. 17/07/20 23:50:29 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
  23. 17/07/20 23:50:29 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
  24. 17/07/20 23:50:30 INFO mapred.LocalJobRunner:
  25. 17/07/20 23:50:30 INFO mapred.MapTask: Starting flush of map output
  26. 17/07/20 23:50:30 INFO mapred.MapTask: Spilling map output
  27. 17/07/20 23:50:30 INFO mapred.MapTask: bufstart = 0; bufend = 185; bufvoid = 104857600
  28. 17/07/20 23:50:30 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214336(104857344); length = 61/6553600
  29. 17/07/20 23:50:30 INFO mapred.MapTask: Finished spill 0
  30. 17/07/20 23:50:30 INFO mapred.Task: Task:attempt_local551429012_0001_m_000000_0 is done. And is in the process of committing
  31. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: map
  32. 17/07/20 23:50:30 INFO mapred.Task: Task 'attempt_local551429012_0001_m_000000_0' done.
  33. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_m_000000_0
  34. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: map task executor complete.
  35. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: Waiting for reduce tasks
  36. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_r_000000_0
  37. 17/07/20 23:50:30 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
  38. 17/07/20 23:50:30 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@25ca7b00
  39. 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
  40. 17/07/20 23:50:30 INFO reduce.EventFetcher: attempt_local551429012_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
  41. 17/07/20 23:50:30 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local551429012_0001_m_000000_0 decomp: 219 len: 223 to MEMORY
  42. 17/07/20 23:50:30 INFO reduce.InMemoryMapOutput: Read 219 bytes from map-output for attempt_local551429012_0001_m_000000_0
  43. 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 219, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->219
  44. 17/07/20 23:50:30 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
  45. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied.
  46. 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
  47. 17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments
  48. 17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes
  49. 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merged 1 segments, 219 bytes to disk to satisfy reduce memory limit
  50. 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 1 files, 223 bytes from disk
  51. 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
  52. 17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments
  53. 17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes
  54. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied.
  55. 17/07/20 23:50:30 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
  56. 17/07/20 23:50:31 INFO mapreduce.Job:  map 100% reduce 0%
  57. 17/07/20 23:50:31 INFO mapred.Task: Task:attempt_local551429012_0001_r_000000_0 is done. And is in the process of committing
  58. 17/07/20 23:50:31 INFO mapred.LocalJobRunner: 1 / 1 copied.
  59. 17/07/20 23:50:31 INFO mapred.Task: Task attempt_local551429012_0001_r_000000_0 is allowed to commit now
  60. 17/07/20 23:50:31 INFO output.FileOutputCommitter: Saved output of task 'attempt_local551429012_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/output/_temporary/0/task_local551429012_0001_r_000000
  61. 17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce > reduce
  62. 17/07/20 23:50:31 INFO mapred.Task: Task 'attempt_local551429012_0001_r_000000_0' done.
  63. 17/07/20 23:50:31 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_r_000000_0
  64. 17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce task executor complete.
  65. 17/07/20 23:50:32 INFO mapreduce.Job:  map 100% reduce 100%
  66. 17/07/20 23:50:32 INFO mapreduce.Job: Job job_local551429012_0001 completed successfully
  67. 17/07/20 23:50:32 INFO mapreduce.Job: Counters: 38
  68.         File System Counters
  69.                 FILE: Number of bytes read=13702
  70.                 FILE: Number of bytes written=510885
  71.                 FILE: Number of read operations=0
  72.                 FILE: Number of large read operations=0
  73.                 FILE: Number of write operations=0
  74.                 HDFS: Number of bytes read=2052
  75.                 HDFS: Number of bytes written=8
  76.                 HDFS: Number of read operations=15
  77.                 HDFS: Number of large read operations=0
  78.                 HDFS: Number of write operations=4
  79.         Map-Reduce Framework
  80.                 Map input records=17
  81.                 Map output records=16  
  82.            Map output bytes=184
  83.            Map output materialized bytes=222
  84.                 Input split bytes=136
  85.                 Combine input records=0
  86.                 Combine output records=0
  87.                 Reduce input groups=14
  88.                 Reduce shuffle bytes=222
  89.                 Reduce input records=16
  90.                 Reduce output records=2
  91.                 Spilled Records=32
  92.                 Shuffled Maps =1
  93.                 Failed Shuffles=0
  94.                 Merged Map outputs=1
  95.                 GC time elapsed (ms)=373
  96.                 CPU time spent (ms)=0
  97.                 Physical memory (bytes) snapshot=0
  98.                 Virtual memory (bytes) snapshot=0
  99.                 Total committed heap usage (bytes)=241573888
  100.         Shuffle Errors
  101.                 BAD_ID=0
  102.                 CONNECTION=0
  103.                 IO_ERROR=0
  104.                 WRONG_LENGTH=0
  105.                 WRONG_MAP=0
  106.                 WRONG_REDUCE=0
  107.         File Input Format Counters
  108.                 Bytes Read=1026
  109.         File Output Format Counters
  110.                 Bytes Written=14

Step 9: Validate Output

Check the output at HDFS path.

hdfs
 
  1. [root@NN MapReduce]# hadoop fs -ls /user/bdp/mapreduce/duplicateValue/output
  2. 17/07/21 00:38:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. Found 2 items
  4. -rw-r--r--   1 root supergroup          0 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/_SUCCESS
  5. -rw-r--r--   1 root supergroup         14 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/part-r-00000

Check the output

output
 
  1. [root@NN MapReduce]# hadoop fs -cat /user/bdp/mapreduce/duplicateValue/output/part-r-00000                                17/07/21 00:38:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  2. <strong>Celie</strong>
  3. <strong>Hercule</strong>

Wrapping Up

Here, I have written map reduce to find the duplicate value of the first name from the data file. You can modify the code in case you want to check duplicate in a different field. The only mapper code will get the change in order to write map reduce to find the duplicate for another field like last_name, designation etc.

Load CSV file into hive AVRO table

Requirement You have comma separated(CSV) file and you want to create Avro table in hive on top of it, then ...
Read More

Load CSV file into hive PARQUET table

Requirement You have comma separated(CSV) file and you want to create Parquet table in hive on top of it, then ...
Read More

Hive Most Asked Interview Questions With Answers – Part II

What is bucketing and what is the use of it? Answer: Bucket is an optimisation technique which is used to ...
Read More
/ hive, hive interview, interview-qa

Spark Interview Questions Part-1

Suppose you have a spark dataframe which contains millions of records. You need to perform multiple actions on it. How ...
Read More

Leave a Reply