How to find duplicate value using Map Reduce

How to find duplicate value using Map Reduce

Requirement

Suppose you get data files which are having user’s basic information like first name, last name, designation, city etc. These basic details are separated by ‘,’ delimiter. Now, the requirement has come to find out all the duplicate value of any field of information. So, here the requirement is how to find duplicate value using Map Reduce.

 

Components Involved

  • HDFS: Here it is used to store the input data which will get pass to the Map Reduce as an input and also store the map-reduce output.
  • Map Reduce: Here it is used to process the data/file available in HDFS and store the output to the HDFS.

Sample Data

Below are the sample data for the exercise:

sampledataDuplicate
 
  1. id,first_name,last_name,gender,designation,city,country
  2. 1,Thayne,Mocher,Male,Administrative Officer,Port Colborne,Canada
  3. 2,Shelly,Bulfoot,Female,Analyst Programmer,Bacong,Philippines
  4. 3,Hercule,Chorlton,Male,Operator,Al Mazzunah,Tunisia
  5. 4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine
  6. 5,Madelena,Savin,Female,Help Desk Technician,Jinjiang,China
  7. 6,Adeline,Imesson,Female,Legal Assistant,Fort Beaufort,South Africa
  8. 7,Celie,Richards,Male,Chemical Engineer,Dubiecko,Poland
  9. 8,Lilas,Harrowing,Female,Assistant Media Planner,Guayatá,Colombia
  10. 9,Freida,Leivers,Female,Legal Assistant,Bangus Kulon,Indonesia
  11. 10,Celie,Dolligon,Female,Data Coordiator,Paraty,Brazil
  12. 11,Berkley,Orteaux,Male,Assistant Professor,Zinder,Niger
  13. 12,Gilburt,Minot,Male,Marketing Assistant,Hanyuan,China
  14. 13,Blaine,Treverton,Male,Research Associate,Yuankeng,China
  15. 14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines
  16. 15,Nikos,Worpole,Male,Human Resources Assistant II,Esmeralda,Cuba
  17. 16,Hercule,Richards,Male,Chemical Engineer,Dubiecko,Poland

In the sample data, all the fields are the comma(,) separated. We have to find out only repeated value in a column. Let’s take the first_name field. So the value repeated in this field will be output. If you see the sample data, all the repeatable values are highlighted in the first name field.

Solution

In the solution, we will write a map reduce program. This will be written in Java programming language. The MapReduce will take an input which is the data file and write the output at the HDFS path. The solution will have multiple steps. Let’s go through all the steps sequentially.

Step 1: Input Data Preparation

The first step is to prepare the input data for the map reduce. This will be input for the map reduce. Let’s keep the sample data into a file and keep it at the local path. In my case, file name is sampledataForDuplicate” and local path is /home/NN/HadoopRepo/MapReduce/resources/duplicateValue”.

We need to move this data file to an HDFS location which will be input path for the map reduce.

copyToHdfs
 
hadoop fs -put /home/NN/HadoopRepo/MapReduce/resources/duplicateValue /user/bdp/mapreduce

It will copy the data file from local to HDFS location.

Step 2: Create Maven Project

In order to write map reduce program, create a maven project. We will add a dependency for the required package.

Follow the below steps in order to create the maven project:

  • Open Eclipse
  • Create a maven project

Step 3: Resolve Dependency

Add dependency in pom.XML file and resolve dependency using cmd:

pom
 
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.1</version>
</dependency>
<!-- Hadoop Mapreduce Client Core -->
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.7.1</version>
</dependency>
<dependency>
          <groupId>jdk.tools</groupId>
          <artifactId>jdk.tools</artifactId>
          <version>${java.version}</version>
          <scope>system</scope>
          <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

Step 4: Write Mapper

Once you are done with all above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.

DuplicateValueMapper
 
package com.bdp.mapreduce.duplicate.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DuplicateValueMapper 
      extends Mapper<LongWritable, Text, Text, IntWritable>{
      private static final IntWritable one = new IntWritable(1);
      
      @Override
      protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //Skipping the header of the input
            if (key.get() == 0 && value.toString().contains("first_name")) {
                  return;
            } 
            else {
                  String values[] = value.toString().split(",");
                  context.write(new Text(values[1]), one); //Writing first_name value as a key
            }
      }
}

Here, in the mapper, I am taking first_name as a key because I want to find out the duplicate record in the first_name field. We can change it as per the different field.

Step 5: Write Reducer

In this step, we will take the mapper data as an input and process it. The actual logic has been written here.

Find the code below:

DuplicateValueReducer
 
package com.bdp.mapreduce.duplicate.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;
/*
 * This reducer will get mapper data as input and return only key that is duplicate value.
 * 
 */
public class DuplicateValueReducer 
      extends Reducer<Text, IntWritable, Text, NullWritable>{
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //Check if the key has duplicate value
            if (Iterables.size(values) > 1) {
                  context.write(key, NullWritable.get());
            }
      }
}

In the reducer, we are writing only those value which is more than 1.

Step 6: Write Driver

In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:

DuplicateValueDriver
 
package com.bdp.mapreduce.duplicate.driver;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.bdp.mapreduce.duplicate.mapper.DuplicateValueMapper;
import com.bdp.mapreduce.duplicate.reducer.DuplicateValueReducer;
public class DuplicateValueDriver 
      extends Configured implements Tool{
      public int run(String[] arg0) throws Exception {
            // TODO Auto-generated method stub
            @SuppressWarnings("deprecation")
            Job job = new Job(getConf(), "Duplicate value");
            job.setJarByClass(getClass());
            
            job.setMapperClass(DuplicateValueMapper.class);
            job.setReducerClass(DuplicateValueReducer.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            FileInputFormat.addInputPath(job, new Path(arg0[0]));
            FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
            
            return job.waitForCompletion(true) ? 0 : 1;            
      }
      
      public static void main(String[] args) throws Exception {
            int jobStatus = ToolRunner.run(new DuplicateValueDriver(), args);
            System.out.println(jobStatus);
      }
}

Step 7: Package Preparation

In this step, we will create a package(.jar) of the project. Follow the below steps:

  • Open CMD
  • Head to the maven project
  • Use command:

mvn package

It will create a .jar file under target directory. Copy this created jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce“.

Step 8: Execution

All the setup has been done. Let’s execute the job and validate output. In order to execute the map reduce, use below command:

Format:

hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>

Ex:

hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output

console
 
[root@NN hadoop-2.6.0]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output
17/07/20 23:50:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/20 23:50:23 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
17/07/20 23:50:23 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
17/07/20 23:50:24 INFO input.FileInputFormat: Total input paths to process : 1
17/07/20 23:50:25 INFO mapreduce.JobSubmitter: number of splits:1
17/07/20 23:50:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local551429012_0001
17/07/20 23:50:28 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
17/07/20 23:50:28 INFO mapreduce.Job: Running job: job_local551429012_0001
17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter set in config null
17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
17/07/20 23:50:28 INFO mapred.LocalJobRunner: Waiting for map tasks
17/07/20 23:50:28 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_m_000000_0
17/07/20 23:50:29 INFO mapreduce.Job: Job job_local551429012_0001 running in uber mode : false
17/07/20 23:50:29 INFO mapreduce.Job:  map 0% reduce 0%
17/07/20 23:50:29 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
17/07/20 23:50:29 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/sampledataForDuplicate:0+1026
17/07/20 23:50:29 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
17/07/20 23:50:29 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
17/07/20 23:50:29 INFO mapred.MapTask: soft limit at 83886080
17/07/20 23:50:29 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
17/07/20 23:50:29 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
17/07/20 23:50:29 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
17/07/20 23:50:30 INFO mapred.LocalJobRunner:
17/07/20 23:50:30 INFO mapred.MapTask: Starting flush of map output
17/07/20 23:50:30 INFO mapred.MapTask: Spilling map output
17/07/20 23:50:30 INFO mapred.MapTask: bufstart = 0; bufend = 185; bufvoid = 104857600
17/07/20 23:50:30 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214336(104857344); length = 61/6553600
17/07/20 23:50:30 INFO mapred.MapTask: Finished spill 0
17/07/20 23:50:30 INFO mapred.Task: Task:attempt_local551429012_0001_m_000000_0 is done. And is in the process of committing
17/07/20 23:50:30 INFO mapred.LocalJobRunner: map
17/07/20 23:50:30 INFO mapred.Task: Task 'attempt_local551429012_0001_m_000000_0' done.
17/07/20 23:50:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_m_000000_0
17/07/20 23:50:30 INFO mapred.LocalJobRunner: map task executor complete.
17/07/20 23:50:30 INFO mapred.LocalJobRunner: Waiting for reduce tasks
17/07/20 23:50:30 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_r_000000_0
17/07/20 23:50:30 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
17/07/20 23:50:30 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@25ca7b00
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
17/07/20 23:50:30 INFO reduce.EventFetcher: attempt_local551429012_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
17/07/20 23:50:30 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local551429012_0001_m_000000_0 decomp: 219 len: 223 to MEMORY
17/07/20 23:50:30 INFO reduce.InMemoryMapOutput: Read 219 bytes from map-output for attempt_local551429012_0001_m_000000_0
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 219, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->219
17/07/20 23:50:30 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments
17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merged 1 segments, 219 bytes to disk to satisfy reduce memory limit
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 1 files, 223 bytes from disk
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments
17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes
17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/20 23:50:30 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
17/07/20 23:50:31 INFO mapreduce.Job:  map 100% reduce 0%
17/07/20 23:50:31 INFO mapred.Task: Task:attempt_local551429012_0001_r_000000_0 is done. And is in the process of committing
17/07/20 23:50:31 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/20 23:50:31 INFO mapred.Task: Task attempt_local551429012_0001_r_000000_0 is allowed to commit now
17/07/20 23:50:31 INFO output.FileOutputCommitter: Saved output of task 'attempt_local551429012_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/output/_temporary/0/task_local551429012_0001_r_000000
17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce > reduce
17/07/20 23:50:31 INFO mapred.Task: Task 'attempt_local551429012_0001_r_000000_0' done.
17/07/20 23:50:31 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_r_000000_0
17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce task executor complete.
17/07/20 23:50:32 INFO mapreduce.Job:  map 100% reduce 100%
17/07/20 23:50:32 INFO mapreduce.Job: Job job_local551429012_0001 completed successfully
17/07/20 23:50:32 INFO mapreduce.Job: Counters: 38
        File System Counters
                FILE: Number of bytes read=13702
                FILE: Number of bytes written=510885
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2052
                HDFS: Number of bytes written=8
                HDFS: Number of read operations=15
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=4
        Map-Reduce Framework
                Map input records=17
                Map output records=16   
           Map output bytes=184
           Map output materialized bytes=222
                Input split bytes=136
                Combine input records=0
                Combine output records=0
                Reduce input groups=14
                Reduce shuffle bytes=222
                Reduce input records=16
                Reduce output records=2
                Spilled Records=32
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=373
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=241573888
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=1026
        File Output Format Counters
                Bytes Written=14

Step 9: Validate Output

Check the output at HDFS path.

hdfs
 
[root@NN MapReduce]# hadoop fs -ls /user/bdp/mapreduce/duplicateValue/output
17/07/21 00:38:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 root supergroup          0 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/_SUCCESS
-rw-r--r--   1 root supergroup         14 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/part-r-00000

Check the output

output
 
[root@NN MapReduce]# hadoop fs -cat /user/bdp/mapreduce/duplicateValue/output/part-r-00000                                17/07/21 00:38:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Celie
Hercule

Wrapping Up

Here, I have written map reduce to find the duplicate value of the first name from the data file. You can modify the code in case you want to check duplicate in a different field. The only mapper code will get the change in order to write map reduce to find the duplicate for another field like last_name, designation etc.

99
0

Join in hive with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in pyspark with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in spark using scala with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Java UDF to convert String to date in PIG

About Code Many times it happens like you have received data from many systems and each system operates on a ...
Read More
/ java udf, Pig, pig, pig udf, string to date, udf

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.