Requirement
Suppose you get data files which are having user’s basic information like first name, last name, designation, city, etc. These basic details are separated by ‘,’ delimiter. Now, the requirement has come to find out all the duplicate value of any field of information. So, here the requirement is how to find duplicate values using Map Reduce.
Components Involved
- HDFS: To store the input data which will be passed to the Map Reduce as an input and also store the map-reduce output.
- Map Reduce: To process the data/file available in HDFS and store the output to the HDFS.
Sample Data
Below are the sample data for this exercise:
id,first_name,last_name,gender,designation,city,country 1,Thayne,Mocher,Male,Administrative Officer,Port Colborne,Canada 2,Shelly,Bulfoot,Female,Analyst Programmer,Bacong,Philippines 3,Hercule,Chorlton,Male,Operator,Al Mazzunah,Tunisia 4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine 5,Madelena,Savin,Female,Help Desk Technician,Jinjiang,China 6,Adeline,Imesson,Female,Legal Assistant,Fort Beaufort,South Africa 7,Celie,Richards,Male,Chemical Engineer,Dubiecko,Poland 8,Lilas,Harrowing,Female,Assistant Media Planner,Guayatá,Colombia 9,Freida,Leivers,Female,Legal Assistant,Bangus Kulon,Indonesia 10,Celie,Dolligon,Female,Data Coordiator,Paraty,Brazil 11,Berkley,Orteaux,Male,Assistant Professor,Zinder,Niger 12,Gilburt,Minot,Male,Marketing Assistant,Hanyuan,China 13,Blaine,Treverton,Male,Research Associate,Yuankeng,China 14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines 15,Nikos,Worpole,Male,Human Resources Assistant II,Esmeralda,Cuba 16,Hercule,Richards,Male,Chemical Engineer,Dubiecko,Poland
In the sample data, all the fields are the comma(,) separated. We have to find out only repeated values of the column. Let’s take the first_name field. So the value repeated in this field will be output. If you see the sample data, all the repeated values are highlighted in the first name field.
Solution
In the solution, we will write a map reduce program. This will be written in the Java programming language. The MapReduce will take an input which is the data file and write the output at the HDFS path. The solution will have multiple steps. Let’s go through all the steps sequentially.
Step 1: Input Data Preparation
The first step is to prepare the input data for the map reduce. This will be input for the map reduce. Let’s keep the sample data into a file and keep it at the local path. In my case, file name is “sampledataForDuplicate” and local path is “/home/NN/HadoopRepo/MapReduce/resources/duplicateValue”.
We need to move this data file to an HDFS location which will be the input path for the map reduce.
hadoop fs -put /home/NN/HadoopRepo/MapReduce/resources/duplicateValue /user/bdp/mapreduce
It will copy the data file from local to HDFS location.
Step 2: Create Maven Project
In order to write a map-reduce program, create a maven project. We will add a dependency for the required package.
Follow the below steps in order to create the maven project:
- Open Eclipse
- Create a maven project
Step 3: Resolve Dependency
Add the dependency in the pom.XML file and resolve dependency using cmd:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> <!-- Hadoop Mapreduce Client Core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>${java.version}</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency>
Step 4: Write Mapper
Once you are done with all the above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.
package com.bdp.mapreduce.duplicate.mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DuplicateValueMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //Skipping the header of the input if (key.get() == 0 && value.toString().contains("first_name")) { return; } else { String values[] = value.toString().split(","); context.write(new Text(values[1]), one); //Writing first_name value as a key } } }
Here, in the mapper, I am taking first_name as a key because I want to find out the duplicate record in the first_name field. We can change it to a different field.
Step 5: Write Reducer
In this step, we will take the mapper data as an input and process it. The actual logic has been written here.
Find the code below:
package com.bdp.mapreduce.duplicate.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; /* * This reducer will get mapper data as input and return only key that is duplicate value. * */ public class DuplicateValueReducer extends Reducer<Text, IntWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //Check if the key has duplicate value if (Iterables.size(values) > 1) { context.write(key, NullWritable.get()); } } }
In the reducer, we are writing only those values which are more than 1.
Step 6: Write Driver
In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:
package com.bdp.mapreduce.duplicate.driver; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.bdp.mapreduce.duplicate.mapper.DuplicateValueMapper; import com.bdp.mapreduce.duplicate.reducer.DuplicateValueReducer; public class DuplicateValueDriver extends Configured implements Tool{ public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub @SuppressWarnings("deprecation") Job job = new Job(getConf(), "Duplicate value"); job.setJarByClass(getClass()); job.setMapperClass(DuplicateValueMapper.class); job.setReducerClass(DuplicateValueReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int jobStatus = ToolRunner.run(new DuplicateValueDriver(), args); System.out.println(jobStatus); } }
Step 7: Package Preparation
In this step, we will create a package(.jar) of the project. Follow the below steps:
- Open CMD
- Head to the maven project
- Use command:
mvn package
It will create a .jar file under the target directory. Copy this created jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce“.
Step 8: Execution
All the setup has been done. Let’s execute the job and validate output. In order to execute the map reduce, use below command:
Format:
hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>
eg:
hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output
[root@NN hadoop-2.6.0]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output 17/07/20 23:50:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/07/20 23:50:23 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 17/07/20 23:50:23 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 17/07/20 23:50:24 INFO input.FileInputFormat: Total input paths to process : 1 17/07/20 23:50:25 INFO mapreduce.JobSubmitter: number of splits:1 17/07/20 23:50:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local551429012_0001 17/07/20 23:50:28 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 17/07/20 23:50:28 INFO mapreduce.Job: Running job: job_local551429012_0001 17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter set in config null 17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 17/07/20 23:50:28 INFO mapred.LocalJobRunner: Waiting for map tasks 17/07/20 23:50:28 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_m_000000_0 17/07/20 23:50:29 INFO mapreduce.Job: Job job_local551429012_0001 running in uber mode : false 17/07/20 23:50:29 INFO mapreduce.Job: map 0% reduce 0% 17/07/20 23:50:29 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 17/07/20 23:50:29 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/sampledataForDuplicate:0+1026 17/07/20 23:50:29 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 17/07/20 23:50:29 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 17/07/20 23:50:29 INFO mapred.MapTask: soft limit at 83886080 17/07/20 23:50:29 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 17/07/20 23:50:29 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 17/07/20 23:50:29 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 17/07/20 23:50:30 INFO mapred.LocalJobRunner: 17/07/20 23:50:30 INFO mapred.MapTask: Starting flush of map output 17/07/20 23:50:30 INFO mapred.MapTask: Spilling map output 17/07/20 23:50:30 INFO mapred.MapTask: bufstart = 0; bufend = 185; bufvoid = 104857600 17/07/20 23:50:30 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214336(104857344); length = 61/6553600 17/07/20 23:50:30 INFO mapred.MapTask: Finished spill 0 17/07/20 23:50:30 INFO mapred.Task: Task:attempt_local551429012_0001_m_000000_0 is done. And is in the process of committing 17/07/20 23:50:30 INFO mapred.LocalJobRunner: map 17/07/20 23:50:30 INFO mapred.Task: Task 'attempt_local551429012_0001_m_000000_0' done. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_m_000000_0 17/07/20 23:50:30 INFO mapred.LocalJobRunner: map task executor complete. 17/07/20 23:50:30 INFO mapred.LocalJobRunner: Waiting for reduce tasks 17/07/20 23:50:30 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_r_000000_0 17/07/20 23:50:30 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 17/07/20 23:50:30 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@25ca7b00 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10 17/07/20 23:50:30 INFO reduce.EventFetcher: attempt_local551429012_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 17/07/20 23:50:30 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local551429012_0001_m_000000_0 decomp: 219 len: 223 to MEMORY 17/07/20 23:50:30 INFO reduce.InMemoryMapOutput: Read 219 bytes from map-output for attempt_local551429012_0001_m_000000_0 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 219, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->219 17/07/20 23:50:30 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied. 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments 17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merged 1 segments, 219 bytes to disk to satisfy reduce memory limit 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 1 files, 223 bytes from disk 17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments 17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes 17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied. 17/07/20 23:50:30 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 17/07/20 23:50:31 INFO mapreduce.Job: map 100% reduce 0% 17/07/20 23:50:31 INFO mapred.Task: Task:attempt_local551429012_0001_r_000000_0 is done. And is in the process of committing 17/07/20 23:50:31 INFO mapred.LocalJobRunner: 1 / 1 copied. 17/07/20 23:50:31 INFO mapred.Task: Task attempt_local551429012_0001_r_000000_0 is allowed to commit now 17/07/20 23:50:31 INFO output.FileOutputCommitter: Saved output of task 'attempt_local551429012_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/output/_temporary/0/task_local551429012_0001_r_000000 17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce > reduce 17/07/20 23:50:31 INFO mapred.Task: Task 'attempt_local551429012_0001_r_000000_0' done. 17/07/20 23:50:31 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_r_000000_0 17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce task executor complete. 17/07/20 23:50:32 INFO mapreduce.Job: map 100% reduce 100% 17/07/20 23:50:32 INFO mapreduce.Job: Job job_local551429012_0001 completed successfully 17/07/20 23:50:32 INFO mapreduce.Job: Counters: 38 File System Counters FILE: Number of bytes read=13702 FILE: Number of bytes written=510885 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=2052 HDFS: Number of bytes written=8 HDFS: Number of read operations=15 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=17 Map output records=16 Map output bytes=184 Map output materialized bytes=222 Input split bytes=136 Combine input records=0 Combine output records=0 Reduce input groups=14 Reduce shuffle bytes=222 Reduce input records=16 Reduce output records=2 Spilled Records=32 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=373 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=241573888 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=1026 File Output Format Counters Bytes Written=14
Step 9: Validate Output
Check the output at HDFS path.
[root@NN MapReduce]# hadoop fs -ls /user/bdp/mapreduce/duplicateValue/output 17/07/21 00:38:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 1 root supergroup 0 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/_SUCCESS -rw-r--r-- 1 root supergroup 14 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/part-r-00000
Check the output
[root@NN MapReduce]# hadoop fs -cat /user/bdp/mapreduce/duplicateValue/output/part-r-00000 17/07/21 00:38:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Celie Hercule
Wrapping Up
Here, I have written a map-reduce application to find the duplicate values of the first name of the data file. You can modify the code in case you want to check duplicate on a different field like last_name, designation, etc, you just need to change mapper code.