How to get distinct words of a file using Map Reduce

How to get distinct words of a file using Map Reduce

Requirement

Suppose you have a file with full of contents. In this file, many words are repeatable. Now the requirement is how to get distinct words from the file using Map Reduce. If you compare with the SQL, then we have to write a map reduce program which is similar to the DISTINCT function of SQL.

Components Involved

  • HDFS: Here it is used to store the input data which will get pass to the Map Reduce as an input and also store the map-reduce output.
  • Map Reduce: Here it is used to process the data/file available in HDFS and store the output to the HDFS.

Sample Data

Let’s take an example of the very popular poem:

sampledata
 
  1. Twinkle, twinkle, little star,
  2. How I wonder what you are!
  3. Up above the world so high,
  4. Like a diamond in the sky.
  5. When the blazing sun is gone,
  6. When he nothing shines upon,
  7. Then you show your little light,
  8. Twinkle, twinkle, all the night.
  9. Then the traveler in the dark
  10. Thanks you for your tiny spark,
  11. How could he see where to go,
  12. If you did not twinkle so?
  13. In the dark blue sky you keep,
  14. Often through my curtains peep
  15. For you never shut your eye,
  16. Till the sun is in the sky.
  17. As your bright and tiny spark
  18. Lights the traveler in the dark,
  19. Though I know not what you are,
  20. Twinkle, twinkle, little star.

Here, many words like Twinkle, twinkle is repeatable. Also, suppose these words are case sensitive. So Twinkle and twinkle are a different word.

Solution

Let’s start with the solution. In order to achieve the requirement, we are going to use map reduce to process the data.

Step 1: Input Data Preparation

In map reduce, we have to pass input to process it. So let’s first set up the input for the map-reduce before moving forward. Download the input data from Sample. Once downloaded, move it to any HDFS location. This HDFS location will be an input path for the map reduce. In my case, hdfs location is “/user/bdp/mapreduce/distinctvalue/input”.

hadoop fs -copyFromLocal/home/NN/HadoopRepo/MapReduce/resources/distinctvalue/sampledata.txt /user/bdp/mapreduce/distinctvalue/input

Step 2: Create Maven Project

We are going to use Java programming language to write our Map Reduce program. Let’s create a maven project which will be good in order to resolve all required dependency for the code.

Follow the below steps in order to create the maven project:

  • Open Eclipse
  • Create a maven project

Step 3: Resolve Dependency

Add below dependency in pom.xml file and resolve dependency using cmd:

pom
 
  1. <dependency>
  2.    <groupId>org.apache.hadoop</groupId>
  3.    <artifactId>hadoop-common</artifactId>
  4.    <version>2.7.1</version>
  5. </dependency>
  6. <!-- Hadoop Mapreduce Client Core -->
  7. <dependency>
  8.    <groupId>org.apache.hadoop</groupId>
  9.    <artifactId>hadoop-mapreduce-client-       core</artifactId>
  10.    <version>2.7.1</version>
  11. </dependency>
  12. <dependency>
  13.     <groupId>jdk.tools</groupId>
  14.     <artifactId>jdk.tools</artifactId>
  15.     <version>${java.version}</version>
  16.     <scope>system</scope>
  17.     <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
  18. </dependency>

 Step 4: Write Mapper

Once you are done with all above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.

mapper
 
  1. package com.bdp.mapreduce.distinct.mapper;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. public class DistinctValueMapper
  8. extends Mapper<LongWritable, Text, Text, IntWritable>{
  9. private static final IntWritable one = new IntWritable(1);
  10. @Override
  11. protected void map(LongWritable key, Text value,
  12. Mapper<LongWritable, Text, Text, IntWritable>.Context context)
  13. throws IOException, InterruptedException {
  14. // TODO Auto-generated method stub
  15. String[] words = value.toString().split(" ");
  16. for (String string : words) {
  17. context.write(new Text(string.replaceAll("[-+.^:,?]","")), one);
  18. }
  19. }
  20. }

Step 5: Write Reducer

In this step, we will take the mapper data as an input and process it. The actual logic has been written here.

Find the code below:

reducer
 
  1. package com.bdp.mapreduce.distinct.reducer;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7. public class DistinctValueReducer
  8.       extends Reducer<Text, IntWritable, Text, NullWritable>{
  9.       @Override
  10.       protected void reduce(Text key, Iterable<IntWritable> values,
  11.             Reducer<Text, IntWritable, Text, NullWritable>.Context context)
  12.             throws IOException, InterruptedException {
  13.             // TODO Auto-generated method stub
  14.             int sum = 0;
  15.             for (IntWritable value : values) {
  16.                   sum += value.get();
  17.                   if (sum == 1)
  18.                         context.write(key, NullWritable.get());
  19.             }
  20.       }
  21. }

Step 6: Write Driver

In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:

driver
 
  1. package com.bdp.mapreduce.distinct.driver;
  2. import org.apache.hadoop.conf.Configured;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import org.apache.hadoop.util.Tool;
  10. import org.apache.hadoop.util.ToolRunner;
  11. import com.bdp.mapreduce.distinct.mapper.DistinctValueMapper;
  12. import com.bdp.mapreduce.distinct.reducer.DistinctValueReducer;
  13. public class DistinctValueDriver extends Configured implements Tool{
  14. @SuppressWarnings("deprecation")
  15. public int run(String[] arg0) throws Exception {
  16. // TODO Auto-generated method stub
  17. //Configured conf = new Configured();
  18. Job job = new Job(getConf(), "Distinct Value");
  19. job.setJarByClass(getClass());
  20. job.setMapperClass(DistinctValueMapper.class);
  21. //job.setCombinerClass(DistinctValueReducer.class); //Reducer input and output is diff
  22. job.setReducerClass(DistinctValueReducer.class);
  23. job.setMapOutputKeyClass(Text.class);
  24. job.setMapOutputValueClass(IntWritable.class);
  25. FileInputFormat.addInputPath(job, new Path(arg0[0]));
  26. FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
  27. return job.waitForCompletion(true) ? 0 : 1;
  28. }
  29. public static void main(String[] args) throws Exception{
  30. int jobStatus = ToolRunner.run(new DistinctValueDriver(),args);        
  31. System.exit(jobStatus);
  32. }
  33. }

Step 7: Package Preparation

In this step, we will create a package(.jar) of the project. Follow the below steps:

  • Open CMD
  • Head to the maven project
  • Use the command: “mvn package

It will create a .jar file under target directory. Copy this created jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce”.

Step 8: Execution

All the setup has been done. Let’s execute the job and validate the output. In order to execute the map reduce, use below command:

Format:

hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>

Ex:

command
 
  1. hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>
  2. Ex:
  3. hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.distinct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output
console output
 
  1. [root@NN ~]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.disti
  2. nct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output
  3. 17/07/13 23:46:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  4. 17/07/13 23:46:56 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
  5. 17/07/13 23:46:56 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
  6. 17/07/13 23:46:57 INFO input.FileInputFormat: Total input paths to process : 1
  7. 17/07/13 23:46:57 INFO mapreduce.JobSubmitter: number of splits:1
  8. 17/07/13 23:46:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local683911792_0001
  9. 17/07/13 23:47:01 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
  10. 17/07/13 23:47:01 INFO mapreduce.Job: Running job: job_local683911792_0001
  11. 17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter set in config null
  12. 17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
  13. 17/07/13 23:47:01 INFO mapred.LocalJobRunner: Waiting for map tasks
  14. 17/07/13 23:47:01 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_m_000000_0
  15. 17/07/13 23:47:02 INFO mapreduce.Job: Job job_local683911792_0001 running in uber mode : false
  16. 17/07/13 23:47:02 INFO mapreduce.Job:  map 0% reduce 0%
  17. 17/07/13 23:47:02 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
  18. 17/07/13 23:47:02 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/input/sampledata.txt:0+645
  19. 17/07/13 23:47:02 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
  20. 17/07/13 23:47:02 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
  21. 17/07/13 23:47:02 INFO mapred.MapTask: soft limit at 83886080
  22. 17/07/13 23:47:02 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
  23. 17/07/13 23:47:02 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
  24. 17/07/13 23:47:02 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
  25. 17/07/13 23:47:03 INFO mapred.LocalJobRunner:
  26. 17/07/13 23:47:03 INFO mapred.MapTask: Starting flush of map output
  27. 17/07/13 23:47:03 INFO mapred.MapTask: Spilling map output
  28. 17/07/13 23:47:03 INFO mapred.MapTask: bufstart = 0; bufend = 1110; bufvoid = 104857600
  29. 17/07/13 23:47:03 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213912(104855648); length = 485/6553600
  30. 17/07/13 23:47:03 INFO mapred.MapTask: Finished spill 0
  31. 17/07/13 23:47:03 INFO mapred.Task: Task:attempt_local683911792_0001_m_000000_0 is done. And is in the process of committing
  32. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: map
  33. 17/07/13 23:47:03 INFO mapred.Task: Task 'attempt_local683911792_0001_m_000000_0' done.
  34. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_m_000000_0
  35. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: map task executor complete.
  36. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: Waiting for reduce tasks
  37. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_r_000000_0
  38. 17/07/13 23:47:03 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
  39. 17/07/13 23:47:03 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@79480d9a
  40. 17/07/13 23:47:03 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
  41. 17/07/13 23:47:03 INFO reduce.EventFetcher: attempt_local683911792_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
  42. 17/07/13 23:47:03 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local683911792_0001_m_000000_0 decomp: 1356 len: 1360 to MEMORY
  43. 17/07/13 23:47:03 INFO reduce.InMemoryMapOutput: Read 1356 bytes from map-output for attempt_local683911792_0001_m_000000_0
  44. 17/07/13 23:47:03 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1356, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1356
  45. 17/07/13 23:47:03 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
  46. 17/07/13 23:47:03 INFO mapred.LocalJobRunner: 1 / 1 copied.
  47. 17/07/13 23:47:03 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
  48. 17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments
  49. 17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes
  50. 17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merged 1 segments, 1356 bytes to disk to satisfy reduce memory limit
  51. 17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 1 files, 1360 bytes from disk
  52. 17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
  53. 17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments
  54. 17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes
  55. 17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied.
  56. 17/07/13 23:47:04 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
  57. 17/07/13 23:47:04 INFO mapreduce.Job:  map 100% reduce 0%
  58. 17/07/13 23:47:04 INFO mapred.Task: Task:attempt_local683911792_0001_r_000000_0 is done. And is in the process of committing
  59. 17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied.
  60. 17/07/13 23:47:04 INFO mapred.Task: Task attempt_local683911792_0001_r_000000_0 is allowed to commit now
  61. 17/07/13 23:47:04 INFO output.FileOutputCommitter: Saved output of task 'attempt_local683911792_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/output/_temporary/0/task_local683911792_0001_r_000000
  62. 17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce > reduce
  63. 17/07/13 23:47:04 INFO mapred.Task: Task 'attempt_local683911792_0001_r_000000_0' done.
  64. 17/07/13 23:47:04 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_r_000000_0
  65. 17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce task executor complete.
  66. 17/07/13 23:47:05 INFO mapreduce.Job:  map 100% reduce 100%
  67. 17/07/13 23:47:05 INFO mapreduce.Job: Job job_local683911792_0001 completed successfully
  68. 17/07/13 23:47:05 INFO mapreduce.Job: Counters: 38
  69. File System Counters
  70. FILE: Number of bytes read=16690
  71. FILE: Number of bytes written=515014
  72. FILE: Number of read operations=0
  73. FILE: Number of large read operations=0
  74. FILE: Number of write operations=0
  75. HDFS: Number of bytes read=1290
  76. HDFS: Number of bytes written=400
  77. HDFS: Number of read operations=15
  78. HDFS: Number of large read operations=0
  79. HDFS: Number of write operations=4
  80. Map-Reduce Framework
  81. Map input records=25
  82. Map output records=122
  83. Map output bytes=1110
  84. Map output materialized bytes=1360
  85. Input split bytes=133
  86. Combine input records=0
  87. Combine output records=0
  88. Reduce input groups=74
  89. Reduce shuffle bytes=1360
  90. Reduce input records=122
  91. Reduce output records=74
  92. Spilled Records=244
  93. Shuffled Maps =1
  94. Failed Shuffles=0
  95. Merged Map outputs=1
  96. GC time elapsed (ms)=243
  97. CPU time spent (ms)=0
  98. Physical memory (bytes) snapshot=0
  99. Virtual memory (bytes) snapshot=0
  100. Total committed heap usage (bytes)=241573888
  101. Shuffle Errors
  102. BAD_ID=0
  103. CONNECTION=0
  104. IO_ERROR=0
  105. WRONG_LENGTH=0
  106. WRONG_MAP=0
  107. WRONG_REDUCE=0
  108. File Input Format Counters
  109. Bytes Read=645
  110. File Output Format Counters
  111. Bytes Written=400

Step 9: Validate Output

Check the output at hdfs path.

output
 
  1. [root@NN ~]# hadoop fs -cat /user/bdp/mapreduce/distinctvalue/output/part-r-00000
  2. 17/07/14 00:07:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3.  
  4. As
  5. For
  6. How
  7. I
  8. If
  9. In
  10. Lights
  11. Like
  12. Often
  13. Thanks
  14. Then
  15. Though
  16. Till
  17. Twinkle
  18. Up
  19. When
  20. a
  21. above
  22. all
  23. and
  24. are
  25. are!
  26. blazing
  27. blue
  28. bright
  29. could
  30. curtains
  31. dark
  32. diamond
  33. did
  34. eye
  35. for
  36. go
  37. gone
  38. he
  39. high
  40. in
  41. is
  42. keep
  43. know
  44. light
  45. little
  46. my
  47. never
  48. night
  49. not
  50. nothing
  51. peep
  52. see
  53. shines
  54. show
  55. shut
  56. sky
  57. so
  58. spark
  59. star
  60. sun
  61. the
  62. through
  63. tiny
  64. to
  65. traveler
  66. twinkle
  67. upon
  68. what
  69. where
  70. wonder
  71. world
  72. you
  73. your

Wrapping Up

Here, we have written 1 mapper class, 1 reducer class, and 1 driver. This has been written based on the requirement. In addition to this, we have seen how many minimal dependencies required for this map reduce which has been resolved through Maven. Hope this will help you out to understand map reduce basic concept.

108

Join in hive with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in pyspark with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in spark using scala with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.