Requirement
Suppose you get data files which are having user’s basic information like first name, last name, designation, city, etc. These basic details are separated by ‘,’ delimiter. Now, the requirement has come to find out all the duplicate value of any field of information. So, here the requirement is how to find duplicate values using Map Reduce.
![]()
Components Involved
- HDFS: To store the input data which will be passed to the Map Reduce as an input and also store the map-reduce output.
- Map Reduce: To process the data/file available in HDFS and store the output to the HDFS.
Sample Data
Below are the sample data for this exercise:
id,first_name,last_name,gender,designation,city,country 1,Thayne,Mocher,Male,Administrative Officer,Port Colborne,Canada 2,Shelly,Bulfoot,Female,Analyst Programmer,Bacong,Philippines 3,Hercule,Chorlton,Male,Operator,Al Mazzunah,Tunisia 4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine 5,Madelena,Savin,Female,Help Desk Technician,Jinjiang,China 6,Adeline,Imesson,Female,Legal Assistant,Fort Beaufort,South Africa 7,Celie,Richards,Male,Chemical Engineer,Dubiecko,Poland 8,Lilas,Harrowing,Female,Assistant Media Planner,Guayatá,Colombia 9,Freida,Leivers,Female,Legal Assistant,Bangus Kulon,Indonesia 10,Celie,Dolligon,Female,Data Coordiator,Paraty,Brazil 11,Berkley,Orteaux,Male,Assistant Professor,Zinder,Niger 12,Gilburt,Minot,Male,Marketing Assistant,Hanyuan,China 13,Blaine,Treverton,Male,Research Associate,Yuankeng,China 14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines 15,Nikos,Worpole,Male,Human Resources Assistant II,Esmeralda,Cuba 16,Hercule,Richards,Male,Chemical Engineer,Dubiecko,Poland
In the sample data, all the fields are the comma(,) separated. We have to find out only repeated values of the column. Let’s take the first_name field. So the value repeated in this field will be output. If you see the sample data, all the repeated values are highlighted in the first name field.
Solution
In the solution, we will write a map reduce program. This will be written in the Java programming language. The MapReduce will take an input which is the data file and write the output at the HDFS path. The solution will have multiple steps. Let’s go through all the steps sequentially.
Step 1: Input Data Preparation
The first step is to prepare the input data for the map reduce. This will be input for the map reduce. Let’s keep the sample data into a file and keep it at the local path. In my case, file name is “sampledataForDuplicate” and local path is “/home/NN/HadoopRepo/MapReduce/resources/duplicateValue”.
We need to move this data file to an HDFS location which will be the input path for the map reduce.
hadoop fs -put /home/NN/HadoopRepo/MapReduce/resources/duplicateValue /user/bdp/mapreduce
It will copy the data file from local to HDFS location.
![]()
Step 2: Create Maven Project
In order to write a map-reduce program, create a maven project. We will add a dependency for the required package.
Follow the below steps in order to create the maven project:
- Open Eclipse
- Create a maven project
Step 3: Resolve Dependency
Add the dependency in the pom.XML file and resolve dependency using cmd:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
<!-- Hadoop Mapreduce Client Core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>${java.version}</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>Step 4: Write Mapper
Once you are done with all the above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.
package com.bdp.mapreduce.duplicate.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DuplicateValueMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//Skipping the header of the input
if (key.get() == 0 && value.toString().contains("first_name")) {
return;
}
else {
String values[] = value.toString().split(",");
context.write(new Text(values[1]), one); //Writing first_name value as a key
}
}
}Here, in the mapper, I am taking first_name as a key because I want to find out the duplicate record in the first_name field. We can change it to a different field.
Step 5: Write Reducer
In this step, we will take the mapper data as an input and process it. The actual logic has been written here.
Find the code below:
package com.bdp.mapreduce.duplicate.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;
/*
* This reducer will get mapper data as input and return only key that is duplicate value.
*
*/
public class DuplicateValueReducer
extends Reducer<Text, IntWritable, Text, NullWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//Check if the key has duplicate value
if (Iterables.size(values) > 1) {
context.write(key, NullWritable.get());
}
}
}In the reducer, we are writing only those values which are more than 1.
Step 6: Write Driver
In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:
package com.bdp.mapreduce.duplicate.driver;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.bdp.mapreduce.duplicate.mapper.DuplicateValueMapper;
import com.bdp.mapreduce.duplicate.reducer.DuplicateValueReducer;
public class DuplicateValueDriver
extends Configured implements Tool{
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
@SuppressWarnings("deprecation")
Job job = new Job(getConf(), "Duplicate value");
job.setJarByClass(getClass());
job.setMapperClass(DuplicateValueMapper.class);
job.setReducerClass(DuplicateValueReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int jobStatus = ToolRunner.run(new DuplicateValueDriver(), args);
System.out.println(jobStatus);
}
}Step 7: Package Preparation
In this step, we will create a package(.jar) of the project. Follow the below steps:
- Open CMD
- Head to the maven project
- Use command:
mvn package
![]()
It will create a .jar file under the target directory. Copy this created jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce“.
![]()
Step 8: Execution
All the setup has been done. Let’s execute the job and validate output. In order to execute the map reduce, use below command:
Format:
hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>
eg:
hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output
[root@NN hadoop-2.6.0]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDuplicateValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.duplicate.driver.DuplicateValueDriver /user/bdp/mapreduce/duplicateValue /user/bdp/mapreduce/duplicateValue/output
17/07/20 23:50:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/20 23:50:23 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
17/07/20 23:50:23 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
17/07/20 23:50:24 INFO input.FileInputFormat: Total input paths to process : 1
17/07/20 23:50:25 INFO mapreduce.JobSubmitter: number of splits:1
17/07/20 23:50:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local551429012_0001
17/07/20 23:50:28 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
17/07/20 23:50:28 INFO mapreduce.Job: Running job: job_local551429012_0001
17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter set in config null
17/07/20 23:50:28 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
17/07/20 23:50:28 INFO mapred.LocalJobRunner: Waiting for map tasks
17/07/20 23:50:28 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_m_000000_0
17/07/20 23:50:29 INFO mapreduce.Job: Job job_local551429012_0001 running in uber mode : false
17/07/20 23:50:29 INFO mapreduce.Job: map 0% reduce 0%
17/07/20 23:50:29 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
17/07/20 23:50:29 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/sampledataForDuplicate:0+1026
17/07/20 23:50:29 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
17/07/20 23:50:29 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
17/07/20 23:50:29 INFO mapred.MapTask: soft limit at 83886080
17/07/20 23:50:29 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
17/07/20 23:50:29 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
17/07/20 23:50:29 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
17/07/20 23:50:30 INFO mapred.LocalJobRunner:
17/07/20 23:50:30 INFO mapred.MapTask: Starting flush of map output
17/07/20 23:50:30 INFO mapred.MapTask: Spilling map output
17/07/20 23:50:30 INFO mapred.MapTask: bufstart = 0; bufend = 185; bufvoid = 104857600
17/07/20 23:50:30 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214336(104857344); length = 61/6553600
17/07/20 23:50:30 INFO mapred.MapTask: Finished spill 0
17/07/20 23:50:30 INFO mapred.Task: Task:attempt_local551429012_0001_m_000000_0 is done. And is in the process of committing
17/07/20 23:50:30 INFO mapred.LocalJobRunner: map
17/07/20 23:50:30 INFO mapred.Task: Task 'attempt_local551429012_0001_m_000000_0' done.
17/07/20 23:50:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_m_000000_0
17/07/20 23:50:30 INFO mapred.LocalJobRunner: map task executor complete.
17/07/20 23:50:30 INFO mapred.LocalJobRunner: Waiting for reduce tasks
17/07/20 23:50:30 INFO mapred.LocalJobRunner: Starting task: attempt_local551429012_0001_r_000000_0
17/07/20 23:50:30 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
17/07/20 23:50:30 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@25ca7b00
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
17/07/20 23:50:30 INFO reduce.EventFetcher: attempt_local551429012_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
17/07/20 23:50:30 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local551429012_0001_m_000000_0 decomp: 219 len: 223 to MEMORY
17/07/20 23:50:30 INFO reduce.InMemoryMapOutput: Read 219 bytes from map-output for attempt_local551429012_0001_m_000000_0
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 219, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->219
17/07/20 23:50:30 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments
17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merged 1 segments, 219 bytes to disk to satisfy reduce memory limit
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 1 files, 223 bytes from disk
17/07/20 23:50:30 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
17/07/20 23:50:30 INFO mapred.Merger: Merging 1 sorted segments
17/07/20 23:50:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 209 bytes
17/07/20 23:50:30 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/20 23:50:30 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
17/07/20 23:50:31 INFO mapreduce.Job: map 100% reduce 0%
17/07/20 23:50:31 INFO mapred.Task: Task:attempt_local551429012_0001_r_000000_0 is done. And is in the process of committing
17/07/20 23:50:31 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/20 23:50:31 INFO mapred.Task: Task attempt_local551429012_0001_r_000000_0 is allowed to commit now
17/07/20 23:50:31 INFO output.FileOutputCommitter: Saved output of task 'attempt_local551429012_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/duplicateValue/output/_temporary/0/task_local551429012_0001_r_000000
17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce > reduce
17/07/20 23:50:31 INFO mapred.Task: Task 'attempt_local551429012_0001_r_000000_0' done.
17/07/20 23:50:31 INFO mapred.LocalJobRunner: Finishing task: attempt_local551429012_0001_r_000000_0
17/07/20 23:50:31 INFO mapred.LocalJobRunner: reduce task executor complete.
17/07/20 23:50:32 INFO mapreduce.Job: map 100% reduce 100%
17/07/20 23:50:32 INFO mapreduce.Job: Job job_local551429012_0001 completed successfully
17/07/20 23:50:32 INFO mapreduce.Job: Counters: 38
File System Counters
FILE: Number of bytes read=13702
FILE: Number of bytes written=510885
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2052
HDFS: Number of bytes written=8
HDFS: Number of read operations=15
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=17
Map output records=16
Map output bytes=184
Map output materialized bytes=222
Input split bytes=136
Combine input records=0
Combine output records=0
Reduce input groups=14
Reduce shuffle bytes=222
Reduce input records=16
Reduce output records=2
Spilled Records=32
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=373
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=241573888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1026
File Output Format Counters
Bytes Written=14![]()
![]()
Step 9: Validate Output
Check the output at HDFS path.
[root@NN MapReduce]# hadoop fs -ls /user/bdp/mapreduce/duplicateValue/output 17/07/21 00:38:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 1 root supergroup 0 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/_SUCCESS -rw-r--r-- 1 root supergroup 14 2017-07-21 00:37 /user/bdp/mapreduce/duplicateValue/output/part-r-00000
![]()
Check the output
[root@NN MapReduce]# hadoop fs -cat /user/bdp/mapreduce/duplicateValue/output/part-r-00000 17/07/21 00:38:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Celie Hercule
![]()
Wrapping Up
Here, I have written a map-reduce application to find the duplicate values of the first name of the data file. You can modify the code in case you want to check duplicate on a different field like last_name, designation, etc, you just need to change mapper code.