How to get distinct words of a file using Map Reduce

Requirement

Suppose you have a file in which many words are repeated. Now the requirement is to get distinct words from the file using Map Reduce. If you compare with the SQL, then we have to write a map reduce program which is similar to the DISTINCT function of SQL.

Components Involved

  • HDFS: To store the input data which will be passed to the Map Reduce as an input and also store the map-reduce output.
  • Map Reduce: To process the data/file available in HDFS and store the output to the HDFS.

Sample Data

Let’s take an example of the very popular poem:

 Twinkle, twinkle, little star,
How I wonder what you are!
Up above the world so high,
Like a diamond in the sky.
 
When the blazing sun is gone,
When he nothing shines upon,
Then you show your little light,
Twinkle, twinkle, all the night.
 
Then the traveler in the dark
Thanks you for your tiny spark,
How could he see where to go,
If you did not twinkle so?
 
In the dark blue sky you keep,
Often through my curtains peep
For you never shut your eye,
Till the sun is in the sky.
 
As your bright and tiny spark
Lights the traveler in the dark,
Though I know not what you are,
Twinkle, twinkle, little star.

Here, many words like Twinkle, twinkle is repeated. Also, suppose these words are case sensitive. So Twinkle and twinkle are a different word.

Solution

Let’s start with the solution. 

Step 1: Input Data Preparation

In map reduce, we have to pass input to process it. So let’s first set up the input for the map-reduce before moving forward. Download the input data from Sample. Once downloaded, move it to any HDFS location. This HDFS location will be an input path for the map reduce. In my case, hdfs location is “/user/bdp/mapreduce/distinctvalue/input”.

hadoop fs -copyFromLocal/home/NN/HadoopRepo/MapReduce/resources/distinctvalue/sampledata.txt /user/bdp/mapreduce/distinctvalue/input

Step 2: Create Maven Project

We are going to use Java programming language to write our Map Reduce program. Let’s create a maven project in order to resolve all required dependency for the code.

Follow the below steps in order to create the maven project:

  • Open Eclipse
  • Create a maven project

Step 3: Resolve Dependency

Add below dependency in pom.xml file and resolve dependency using cmd:

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>2.7.1</version>
</dependency>
<!-- Hadoop Mapreduce Client Core -->
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-       core</artifactId>
   <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>${java.version}</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

 Step 4: Write Mapper

Once you are done with all the above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here using a java program to write the mapper.

 package com.bdp.mapreduce.distinct.mapper;


import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistinctValueMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] words = value.toString().split(" ");
for (String string : words) {
context.write(new Text(string.replaceAll("[-+.^:,?]","")), one);
}
}
}

Step 5: Write Reducer

In this step, we will take the mapper data as an input and process it. The actual logic has been written here.

Find the code below:

 package com.bdp.mapreduce.distinct.reducer;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctValueReducer 
      extends Reducer<Text, IntWritable, Text, NullWritable>{
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values, 
            Reducer<Text, IntWritable, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int sum = 0;
            for (IntWritable value : values) {
                  sum += value.get();
                  if (sum == 1)
                        context.write(key, NullWritable.get());
            }
      }
}

Step 6: Write Driver

In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:

 package com.bdp.mapreduce.distinct.driver;


import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


import com.bdp.mapreduce.distinct.mapper.DistinctValueMapper;

import com.bdp.mapreduce.distinct.reducer.DistinctValueReducer;


public class DistinctValueDriver extends Configured implements Tool{


@SuppressWarnings("deprecation")

public int run(String[] arg0) throws Exception {

// TODO Auto-generated method stub

//Configured conf = new Configured();


Job job = new Job(getConf(), "Distinct Value");

job.setJarByClass(getClass());


job.setMapperClass(DistinctValueMapper.class);

//job.setCombinerClass(DistinctValueReducer.class); //Reducer input and output is diff

job.setReducerClass(DistinctValueReducer.class);


job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);


FileInputFormat.addInputPath(job, new Path(arg0[0]));

FileOutputFormat.setOutputPath(job, new Path(arg0[1]));


return job.waitForCompletion(true) ? 0 : 1;

}


public static void main(String[] args) throws Exception{

int jobStatus = ToolRunner.run(new DistinctValueDriver(),args);        

System.exit(jobStatus);

}
}

Step 7: Package Preparation

In this step, we will create a package(.jar) of the project. Follow the below steps:

  • Open CMD
  • Head to the maven project
  • Use the command: “mvn package

It will create a .jar file under the target directory. Copy this jar and keep it at the local path. Here in my case, the jar file is available at “/home/NN/HadoopRepo/MapReduce”.

Step 8: Execution

All the setup has been done. Let’s execute the job and validate the output. In order to execute the map reduce, use below command:

Format:

hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>

Ex:

 hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>
Ex:
hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.distinct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output
[root@NN ~]# hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForDistinctValue-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.disti 
nct.driver.DistinctValueDriver /user/bdp/mapreduce/distinctvalue/input /user/bdp/mapreduce/distinctvalue/output
17/07/13 23:46:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/13 23:46:56 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
17/07/13 23:46:56 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
17/07/13 23:46:57 INFO input.FileInputFormat: Total input paths to process : 1
17/07/13 23:46:57 INFO mapreduce.JobSubmitter: number of splits:1
17/07/13 23:46:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local683911792_0001
17/07/13 23:47:01 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
17/07/13 23:47:01 INFO mapreduce.Job: Running job: job_local683911792_0001
17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter set in config null
17/07/13 23:47:01 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
17/07/13 23:47:01 INFO mapred.LocalJobRunner: Waiting for map tasks
17/07/13 23:47:01 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_m_000000_0
17/07/13 23:47:02 INFO mapreduce.Job: Job job_local683911792_0001 running in uber mode : false
17/07/13 23:47:02 INFO mapreduce.Job:  map 0% reduce 0%
17/07/13 23:47:02 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
17/07/13 23:47:02 INFO mapred.MapTask: Processing split: hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/input/sampledata.txt:0+645
17/07/13 23:47:02 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
17/07/13 23:47:02 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
17/07/13 23:47:02 INFO mapred.MapTask: soft limit at 83886080
17/07/13 23:47:02 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
17/07/13 23:47:02 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
17/07/13 23:47:02 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
17/07/13 23:47:03 INFO mapred.LocalJobRunner: 
17/07/13 23:47:03 INFO mapred.MapTask: Starting flush of map output
17/07/13 23:47:03 INFO mapred.MapTask: Spilling map output
17/07/13 23:47:03 INFO mapred.MapTask: bufstart = 0; bufend = 1110; bufvoid = 104857600
17/07/13 23:47:03 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213912(104855648); length = 485/6553600
17/07/13 23:47:03 INFO mapred.MapTask: Finished spill 0
17/07/13 23:47:03 INFO mapred.Task: Task:attempt_local683911792_0001_m_000000_0 is done. And is in the process of committing
17/07/13 23:47:03 INFO mapred.LocalJobRunner: map
17/07/13 23:47:03 INFO mapred.Task: Task 'attempt_local683911792_0001_m_000000_0' done.
17/07/13 23:47:03 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_m_000000_0
17/07/13 23:47:03 INFO mapred.LocalJobRunner: map task executor complete.
17/07/13 23:47:03 INFO mapred.LocalJobRunner: Waiting for reduce tasks
17/07/13 23:47:03 INFO mapred.LocalJobRunner: Starting task: attempt_local683911792_0001_r_000000_0
17/07/13 23:47:03 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
17/07/13 23:47:03 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@79480d9a
17/07/13 23:47:03 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
17/07/13 23:47:03 INFO reduce.EventFetcher: attempt_local683911792_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
17/07/13 23:47:03 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local683911792_0001_m_000000_0 decomp: 1356 len: 1360 to MEMORY
17/07/13 23:47:03 INFO reduce.InMemoryMapOutput: Read 1356 bytes from map-output for attempt_local683911792_0001_m_000000_0
17/07/13 23:47:03 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1356, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1356
17/07/13 23:47:03 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
17/07/13 23:47:03 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/13 23:47:03 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments
17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes
17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merged 1 segments, 1356 bytes to disk to satisfy reduce memory limit
17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 1 files, 1360 bytes from disk
17/07/13 23:47:04 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
17/07/13 23:47:04 INFO mapred.Merger: Merging 1 sorted segments
17/07/13 23:47:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1353 bytes
17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/13 23:47:04 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
17/07/13 23:47:04 INFO mapreduce.Job:  map 100% reduce 0%
17/07/13 23:47:04 INFO mapred.Task: Task:attempt_local683911792_0001_r_000000_0 is done. And is in the process of committing
17/07/13 23:47:04 INFO mapred.LocalJobRunner: 1 / 1 copied.
17/07/13 23:47:04 INFO mapred.Task: Task attempt_local683911792_0001_r_000000_0 is allowed to commit now
17/07/13 23:47:04 INFO output.FileOutputCommitter: Saved output of task 'attempt_local683911792_0001_r_000000_0' to hdfs://NN:9000/user/bdp/mapreduce/distinctvalue/output/_temporary/0/task_local683911792_0001_r_000000
17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce > reduce
17/07/13 23:47:04 INFO mapred.Task: Task 'attempt_local683911792_0001_r_000000_0' done.
17/07/13 23:47:04 INFO mapred.LocalJobRunner: Finishing task: attempt_local683911792_0001_r_000000_0
17/07/13 23:47:04 INFO mapred.LocalJobRunner: reduce task executor complete.
17/07/13 23:47:05 INFO mapreduce.Job:  map 100% reduce 100%
17/07/13 23:47:05 INFO mapreduce.Job: Job job_local683911792_0001 completed successfully
17/07/13 23:47:05 INFO mapreduce.Job: Counters: 38
File System Counters
FILE: Number of bytes read=16690
FILE: Number of bytes written=515014
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1290
HDFS: Number of bytes written=400
HDFS: Number of read operations=15
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=25
Map output records=122
Map output bytes=1110
Map output materialized bytes=1360
Input split bytes=133
Combine input records=0
Combine output records=0
Reduce input groups=74
Reduce shuffle bytes=1360
Reduce input records=122
Reduce output records=74
Spilled Records=244
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=243
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=241573888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=645
File Output Format Counters 
Bytes Written=400

Step 9: Validate Output

Check the output at HDFS path.

[root@NN ~]# hadoop fs -cat /user/bdp/mapreduce/distinctvalue/output/part-r-00000
17/07/14 00:07:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 
As
For
How
I
If
In
Lights
Like
Often
Thanks
Then
Though
Till
Twinkle
Up
When
a
above
all
and
are
are!
blazing
blue
bright
could
curtains
dark
diamond
did
eye
for
go
gone
he
high
in
is
keep
know
light
little
my
never
night
not
nothing
peep
see
shines
show
shut
sky
so
spark
star
sun
the
through
tiny
to
traveler
twinkle
upon
what
where
wonder
world
you
your

Wrapping Up

Here, we have written 1 mapper class, 1 reducer class, and 1 driver. This has been written based on the requirement. In addition to this, we have seen how many minimal dependencies required for this map reduce which has been resolved through Maven. Hope this will help you out to understand the basic concept of map-reduce.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply