How to get distinct words of a file using Map Reduce

How to get distinct words of a file using Map Reduce

Requirement

Suppose you have a file with full of contents. In this file, many words are repeatable. Now the requirement is how to get distinct words from the file using Map Reduce. If you compare with the SQL, then we have to write a map reduce program which is similar to the DISTINCT function of SQL.

Components Involved

  • HDFS: Here it is used to store the input data which will get pass to the Map Reduce as an input and also store the map-reduce output.
  • Map Reduce: Here it is used to process the data/file available in HDFS and store the output to the HDFS.

Sample Data

Let’s take an example of the very popular poem:

sampledata
 
  1. Twinkle, twinkle, little star,
  2. How I wonder what you are!
  3. Up above the world so high,
  4. Like a diamond in the sky.
  5. When the blazing sun is gone,
  6. When he nothing shines upon,
  7. Then you show your little light,
  8. Twinkle, twinkle, all the night.
  9. Then the traveler in the dark
  10. Thanks you for your tiny spark,
  11. How could he see where to go,
  12. If you did not twinkle so?
  13. In the dark blue sky you keep,
  14. Often through my curtains peep
  15. For you never shut your eye,
  16. Till the sun is in the sky.
  17. As your bright and tiny spark
  18. Lights the traveler in the dark,
  19. Though I know not what you are,
  20. Twinkle, twinkle, little star.

Here, many words like Twinkle, twinkle is repeatable. Also, suppose these words are case sensitive. So Twinkle and twinkle are a different word.

Solution

Let’s start with the solution. In order to achieve the requirement, we are going to use map reduce to process the data.

Step 1: Input Data Preparation

In map reduce, we have to pass input to process it. So let’s first set up the input for the map-reduce before moving forward. Download the input data from Sample. Once downloaded, move it to any HDFS location. This HDFS location will be an input path for the map reduce. In my case, hdfs location is “/user/bdp/mapreduce/distinctvalue/input”.

hadoop fs -copyFromLocal/home/NN/HadoopRepo/MapReduce/resources/distinctvalue/sampledata.txt /user/bdp/mapreduce/distinctvalue/input

Step 2: Create Maven Project

We are going to use Java programming language to write our Map Reduce program. Let’s create a maven project which will be good in order to resolve all required dependency for the code.

Follow the below steps in order to create the maven project:

  • Open Eclipse
  • Create a maven project

Step 3: Resolve Dependency

Add below dependency in pom.xml file and resolve dependency using cmd:

pom
 
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>2.7.1</version>
</dependency>
<!-- Hadoop Mapreduce Client Core -->
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-       core</artifactId>
   <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>${java.version}</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

 Step 4: Write Mapper

Once you are done with all above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.

mapper
 
package com.bdp.mapreduce.distinct.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistinctValueMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] words = value.toString().split(" ");
for (String string : words) {
context.write(new Text(string.replaceAll("[-+.^:,?]","")), one);
}
}
}

Step 5: Write Reducer

In this step, we will take the mapper data as an input and process it. The actual logic has been written here.

Find the code below:

reducer
 
package com.bdp.mapreduce.distinct.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DistinctValueReducer 
      extends Reducer<Text, IntWritable, Text, NullWritable>{
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values, 
            Reducer<Text, IntWritable, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int sum = 0;
            for (IntWritable value : values) {
                  sum += value.get();
                  if (sum == 1)
                        context.write(key, NullWritable.get());
            }
      }
}

Step 6: Write Driver

In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:

driver
 
package com.bdp.mapreduce.distinct.driver;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.bdp.mapreduce.distinct.mapper.DistinctValueMapper;
import com.bdp.mapreduce.distinct.reducer.DistinctValueReducer;
public class DistinctValueDriver extends Configured implements Tool{
@SuppressWarnings("deprecation")
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
//Configured conf = new Configured();
Job job = new Job(getConf(), "Distinct Value");
job.setJarByClass(getClass());
job.setMapperClass(DistinctValueMapper.class);
//job.setCombinerClass(DistinctValueReducer.class); //Reducer input and output is diff
job.setReducerClass(DistinctValueReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception{
int jobStatus = ToolRunner.run(new DistinctValueDriver(),args);        
System.exit(jobStatus);
}
}

Step 7: Package Preparation

In this step, we will create a package(.jar) of the project. Follow the below steps:

  • Open CMD
  • Head to the maven project
  • Use the command: “mvn package

It will create a .jar file under target directory. Copy this created jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce”.

Step 8: Execution

All the setup has been done. Let’s execute the job and validate the output. In order to execute the map reduce, use below command:

Format:

hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>

Ex:

command
 
hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>
Ex:
hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.distinct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output
console output
 
[root@NN ~]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.disti 
nct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output
17/07/13 23:46:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/13 23:46:56 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
17/07/13 23:46:56 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
17/07/13 23:46:57 INFO input.FileInputFormat: Total input paths to process : 1
17/07/13 23:46:57 INFO mapreduce.JobSubmitter: number of splits:1
17/07/13 23:46:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local683911792_0001
17/07/13 23:47:01 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
17/07/13 23:47:01 INFO mapreduce.Job: Running job: job_local683911792_0001
17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter set in config null
17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
17/07/13 23:47:01 INFO mapred.LocalJobRunner: Waiting for map tasks
17/07/13 23:47:01 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_m_000000_0
17/07/13 23:47:02 INFO mapreduce.Job: Job job_local683911792_0001 running in uber mode : false
17/07/13 23:47:02 INFO mapreduce.Job:  map 0% reduce 0%
17/07/13 23:47:02 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
17/07/13 23:47:02 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/input/sampledata.txt:0+645
17/07/13 23:47:02 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
17/07/13 23:47:02 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
17/07/13 23:47:02 INFO mapred.MapTask: soft limit at 83886080
17/07/13 23:47:02 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
17/07/13 23:47:02 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
17/07/13 23:47:02 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
17/07/13 23:47:03 INFO mapred.LocalJobRunner: 
17/07/13 23:47:03 INFO mapred.MapTask: Starting flush of map output
17/07/13 23:47:03 INFO mapred.MapTask: Spilling map output
17/07/13 23:47:03 INFO mapred.MapTask: bufstart = 0; bufend = 1110; bufvoid = 104857600
17/07/13 23:47:03 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213912(104855648); length = 485/6553600
17/07/13 23:47:03 INFO mapred.MapTask: Finished spill 0
17/07/13 23:47:03 INFO mapred.Task: Task:attempt_local683911792_0001_m_000000_0 is done. And is in the process of committing
17/07/13 23:47:03 INFO mapred.LocalJobRunner: map
17/07/13 23:47:03 INFO mapred.Task: Task 'attempt_local683911792_0001_m_000000_0' done.
17/07/13 23:47:03 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_m_000000_0
17/07/13 23:47:03 INFO mapred.LocalJobRunner: map task executor complete.
17/07/13 23:47:03 INFO mapred.LocalJobRunner: Waiting for reduce tasks
17/07/13 23:47:03 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_r_000000_0
17/07/13 23:47:03 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
17/07/13 23:47:03 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@79480d9a
17/07/13 23:47:03 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
17/07/13 23:47:03 INFO reduce.EventFetcher: attempt_local683911792_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
17/07/13 23:47:03 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local683911792_0001_m_000000_0 decomp: 1356 len: 1360 to MEMORY
17/07/13 23:47:03 INFO reduce.InMemoryMapOutput: Read 1356 bytes from map-output for attempt_local683911792_0001_m_000000_0
17/07/13 23:47:03 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1356, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1356
17/07/13 23:47:03 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
17/07/13 23:47:03 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/13 23:47:03 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments
17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes
17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merged 1 segments, 1356 bytes to disk to satisfy reduce memory limit
17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 1 files, 1360 bytes from disk
17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments
17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes
17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/13 23:47:04 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
17/07/13 23:47:04 INFO mapreduce.Job:  map 100% reduce 0%
17/07/13 23:47:04 INFO mapred.Task: Task:attempt_local683911792_0001_r_000000_0 is done. And is in the process of committing
17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/13 23:47:04 INFO mapred.Task: Task attempt_local683911792_0001_r_000000_0 is allowed to commit now
17/07/13 23:47:04 INFO output.FileOutputCommitter: Saved output of task 'attempt_local683911792_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/output/_temporary/0/task_local683911792_0001_r_000000
17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce > reduce
17/07/13 23:47:04 INFO mapred.Task: Task 'attempt_local683911792_0001_r_000000_0' done.
17/07/13 23:47:04 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_r_000000_0
17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce task executor complete.
17/07/13 23:47:05 INFO mapreduce.Job:  map 100% reduce 100%
17/07/13 23:47:05 INFO mapreduce.Job: Job job_local683911792_0001 completed successfully
17/07/13 23:47:05 INFO mapreduce.Job: Counters: 38
File System Counters
FILE: Number of bytes read=16690
FILE: Number of bytes written=515014
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1290
HDFS: Number of bytes written=400
HDFS: Number of read operations=15
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=25
Map output records=122
Map output bytes=1110
Map output materialized bytes=1360
Input split bytes=133
Combine input records=0
Combine output records=0
Reduce input groups=74
Reduce shuffle bytes=1360
Reduce input records=122
Reduce output records=74
Spilled Records=244
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=243
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=241573888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=645
File Output Format Counters 
Bytes Written=400

Step 9: Validate Output

Check the output at hdfs path.

output
 
[root@NN ~]# hadoop fs -cat /user/bdp/mapreduce/distinctvalue/output/part-r-00000
17/07/14 00:07:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 
As
For
How
I
If
In
Lights
Like
Often
Thanks
Then
Though
Till
Twinkle
Up
When
a
above
all
and
are
are!
blazing
blue
bright
could
curtains
dark
diamond
did
eye
for
go
gone
he
high
in
is
keep
know
light
little
my
never
night
not
nothing
peep
see
shines
show
shut
sky
so
spark
star
sun
the
through
tiny
to
traveler
twinkle
upon
what
where
wonder
world
you
your

Wrapping Up

Here, we have written 1 mapper class, 1 reducer class, and 1 driver. This has been written based on the requirement. In addition to this, we have seen how many minimal dependencies required for this map reduce which has been resolved through Maven. Hope this will help you out to understand map reduce basic concept.

108
0

Join in hive with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in pyspark with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in spark using scala with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Java UDF to convert String to date in PIG

About Code Many times it happens like you have received data from many systems and each system operates on a ...
Read More
/ java udf, Pig, pig, pig udf, string to date, udf

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.