Requirement
Suppose you have a file in which many words are repeated. Now the requirement is to get distinct words from the file using Map Reduce. If you compare with the SQL, then we have to write a map reduce program which is similar to the DISTINCT function of SQL.
Components Involved
- HDFS: To store the input data which will be passed to the Map Reduce as an input and also store the map-reduce output.
- Map Reduce: To process the data/file available in HDFS and store the output to the HDFS.
Sample Data
Let’s take an example of the very popular poem:
Twinkle, twinkle, little star, How I wonder what you are! Up above the world so high, Like a diamond in the sky. When the blazing sun is gone, When he nothing shines upon, Then you show your little light, Twinkle, twinkle, all the night. Then the traveler in the dark Thanks you for your tiny spark, How could he see where to go, If you did not twinkle so? In the dark blue sky you keep, Often through my curtains peep For you never shut your eye, Till the sun is in the sky. As your bright and tiny spark Lights the traveler in the dark, Though I know not what you are, Twinkle, twinkle, little star.
Here, many words like Twinkle, twinkle is repeated. Also, suppose these words are case sensitive. So Twinkle and twinkle are a different word.
Solution
Let’s start with the solution.
Step 1: Input Data Preparation
In map reduce, we have to pass input to process it. So let’s first set up the input for the map-reduce before moving forward. Download the input data from Sample. Once downloaded, move it to any HDFS location. This HDFS location will be an input path for the map reduce. In my case, hdfs location is “/user/bdp/mapreduce/distinctvalue/input”.
hadoop fs -copyFromLocal/home/NN/HadoopRepo/MapReduce/resources/distinctvalue/sampledata.txt /user/bdp/mapreduce/distinctvalue/input
Step 2: Create Maven Project
We are going to use Java programming language to write our Map Reduce program. Let’s create a maven project in order to resolve all required dependency for the code.
Follow the below steps in order to create the maven project:
- Open Eclipse
- Create a maven project
Step 3: Resolve Dependency
Add below dependency in pom.xml file and resolve dependency using cmd:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> <!-- Hadoop Mapreduce Client Core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client- core</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>${java.version}</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency>
Step 4: Write Mapper
Once you are done with all the above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.
package com.bdp.mapreduce.distinct.mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DistinctValueMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String[] words = value.toString().split(" "); for (String string : words) { context.write(new Text(string.replaceAll("[-+.^:,?]","")), one); } } }
Step 5: Write Reducer
In this step, we will take the mapper data as an input and process it. The actual logic has been written here.
Find the code below:
package com.bdp.mapreduce.distinct.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DistinctValueReducer extends Reducer<Text, IntWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sum = 0; for (IntWritable value : values) { sum += value.get(); if (sum == 1) context.write(key, NullWritable.get()); } } }
Step 6: Write Driver
In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:
package com.bdp.mapreduce.distinct.driver; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.bdp.mapreduce.distinct.mapper.DistinctValueMapper; import com.bdp.mapreduce.distinct.reducer.DistinctValueReducer; public class DistinctValueDriver extends Configured implements Tool{ @SuppressWarnings("deprecation") public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub //Configured conf = new Configured(); Job job = new Job(getConf(), "Distinct Value"); job.setJarByClass(getClass()); job.setMapperClass(DistinctValueMapper.class); //job.setCombinerClass(DistinctValueReducer.class); //Reducer input and output is diff job.setReducerClass(DistinctValueReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception{ int jobStatus = ToolRunner.run(new DistinctValueDriver(),args); System.exit(jobStatus); } }
Step 7: Package Preparation
In this step, we will create a package(.jar) of the project. Follow the below steps:
- Open CMD
- Head to the maven project
- Use the command: “mvn package“
It will create a .jar file under the target directory. Copy this jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce”.
Step 8: Execution
All the setup has been done. Let’s execute the job and validate the output. In order to execute the map reduce, use below command:
Format:
hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>
Ex:
hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS> Ex: hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.distinct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output
[root@NN ~]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.disti nct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output 17/07/13 23:46:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/07/13 23:46:56 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 17/07/13 23:46:56 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 17/07/13 23:46:57 INFO input.FileInputFormat: Total input paths to process : 1 17/07/13 23:46:57 INFO mapreduce.JobSubmitter: number of splits:1 17/07/13 23:46:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local683911792_0001 17/07/13 23:47:01 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 17/07/13 23:47:01 INFO mapreduce.Job: Running job: job_local683911792_0001 17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter set in config null 17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 17/07/13 23:47:01 INFO mapred.LocalJobRunner: Waiting for map tasks 17/07/13 23:47:01 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_m_000000_0 17/07/13 23:47:02 INFO mapreduce.Job: Job job_local683911792_0001 running in uber mode : false 17/07/13 23:47:02 INFO mapreduce.Job: map 0% reduce 0% 17/07/13 23:47:02 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 17/07/13 23:47:02 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/input/sampledata.txt:0+645 17/07/13 23:47:02 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 17/07/13 23:47:02 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 17/07/13 23:47:02 INFO mapred.MapTask: soft limit at 83886080 17/07/13 23:47:02 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 17/07/13 23:47:02 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 17/07/13 23:47:02 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 17/07/13 23:47:03 INFO mapred.LocalJobRunner: 17/07/13 23:47:03 INFO mapred.MapTask: Starting flush of map output 17/07/13 23:47:03 INFO mapred.MapTask: Spilling map output 17/07/13 23:47:03 INFO mapred.MapTask: bufstart = 0; bufend = 1110; bufvoid = 104857600 17/07/13 23:47:03 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213912(104855648); length = 485/6553600 17/07/13 23:47:03 INFO mapred.MapTask: Finished spill 0 17/07/13 23:47:03 INFO mapred.Task: Task:attempt_local683911792_0001_m_000000_0 is done. And is in the process of committing 17/07/13 23:47:03 INFO mapred.LocalJobRunner: map 17/07/13 23:47:03 INFO mapred.Task: Task 'attempt_local683911792_0001_m_000000_0' done. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_m_000000_0 17/07/13 23:47:03 INFO mapred.LocalJobRunner: map task executor complete. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: Waiting for reduce tasks 17/07/13 23:47:03 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_r_000000_0 17/07/13 23:47:03 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 17/07/13 23:47:03 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@79480d9a 17/07/13 23:47:03 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10 17/07/13 23:47:03 INFO reduce.EventFetcher: attempt_local683911792_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 17/07/13 23:47:03 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local683911792_0001_m_000000_0 decomp: 1356 len: 1360 to MEMORY 17/07/13 23:47:03 INFO reduce.InMemoryMapOutput: Read 1356 bytes from map-output for attempt_local683911792_0001_m_000000_0 17/07/13 23:47:03 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1356, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1356 17/07/13 23:47:03 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 17/07/13 23:47:03 INFO mapred.LocalJobRunner: 1 / 1 copied. 17/07/13 23:47:03 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments 17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes 17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merged 1 segments, 1356 bytes to disk to satisfy reduce memory limit 17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 1 files, 1360 bytes from disk 17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments 17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes 17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied. 17/07/13 23:47:04 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 17/07/13 23:47:04 INFO mapreduce.Job: map 100% reduce 0% 17/07/13 23:47:04 INFO mapred.Task: Task:attempt_local683911792_0001_r_000000_0 is done. And is in the process of committing 17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied. 17/07/13 23:47:04 INFO mapred.Task: Task attempt_local683911792_0001_r_000000_0 is allowed to commit now 17/07/13 23:47:04 INFO output.FileOutputCommitter: Saved output of task 'attempt_local683911792_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/output/_temporary/0/task_local683911792_0001_r_000000 17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce > reduce 17/07/13 23:47:04 INFO mapred.Task: Task 'attempt_local683911792_0001_r_000000_0' done. 17/07/13 23:47:04 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_r_000000_0 17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce task executor complete. 17/07/13 23:47:05 INFO mapreduce.Job: map 100% reduce 100% 17/07/13 23:47:05 INFO mapreduce.Job: Job job_local683911792_0001 completed successfully 17/07/13 23:47:05 INFO mapreduce.Job: Counters: 38 File System Counters FILE: Number of bytes read=16690 FILE: Number of bytes written=515014 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1290 HDFS: Number of bytes written=400 HDFS: Number of read operations=15 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=25 Map output records=122 Map output bytes=1110 Map output materialized bytes=1360 Input split bytes=133 Combine input records=0 Combine output records=0 Reduce input groups=74 Reduce shuffle bytes=1360 Reduce input records=122 Reduce output records=74 Spilled Records=244 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=243 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=241573888 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=645 File Output Format Counters Bytes Written=400
Step 9: Validate Output
Check the output at HDFS path.
[root@NN ~]# hadoop fs -cat /user/bdp/mapreduce/distinctvalue/output/part-r-00000 17/07/14 00:07:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable As For How I If In Lights Like Often Thanks Then Though Till Twinkle Up When a above all and are are! blazing blue bright could curtains dark diamond did eye for go gone he high in is keep know light little my never night not nothing peep see shines show shut sky so spark star sun the through tiny to traveler twinkle upon what where wonder world you your
Wrapping Up
Here, we have written 1 mapper class, 1 reducer class, and 1 driver. This has been written based on the requirement. In addition to this, we have seen how many minimal dependencies required for this map reduce which has been resolved through Maven. Hope this will help you out to understand the basic concept of map-reduce.