Requirement
There are many data files available. It is required to find out the total number of records available in data files using Map Reduce.
Components Involved
- HDFS: To store the input data which will be passed to the Map Reduce as an input and also store the map reduce output.
- Map Reduce: To process the data/file available in HDFS and store the output to the HDFS.
Sample Data
Let’s have a look at the sample data:
id,first_name,last_name,gender,designation,city,country 1,Thayne,Mocher,Male,Administrative Officer,Port Colborne,Canada 2,Shelly,Bulfoot,Female,Analyst Programmer,Bacong,Philippines 3,Hercule,Chorlton,Male,Operator,Al Mazzunah,Tunisia 4,Thorstein,Epton,Male,Administrative Officer,Tayirove,Ukraine 5,Madelena,Savin,Female,Help Desk Technician,Jinjiang,China 6,Adeline,Imesson,Female,Legal Assistant,Fort Beaufort,South Africa 7,Celie,Richards,Male,Chemical Engineer,Dubiecko,Poland 8,Lilas,Harrowing,Female,Assistant Media Planner,Guayata,Colombia 9,Freida,Leivers,Female,Legal Assistant,Bangus Kulon,Indonesia 10,Celie,Dolligon,Female,Data Coordiator,Paraty,Brazil 11,Berkley,Orteaux,Male,Assistant Professor,Zinder,Niger 12,Gilburt,Minot,Male,Marketing Assistant,Hanyuan,China 13,Blaine,Treverton,Male,Research Associate,Yuankeng,China 14,Benjamen,Dodd,Male,Assistant Professor,Beberon,Philippines 15,Nikos,Worpole,Male,Human Resources Assistant II,Esmeralda,Cuba 16,Hercule,Richards,Male,Chemical Engineer,Dubiecko,Poland
Here, the sample data contains user’s info which is a comma(,) separated. You can download the sample data file from here.
recordCountSolution
The solution has many steps from setting up the environment, execution, and validation. Let’s go ahead one by one:
Step 1: Input Data Preparation
The first step includes data preparation. In a real-life scenario, you will have your data. The data preparation means that it should present at the location from where our map reduce job can get it as an input file.
Let’s keep the sample data file at the local path. In my case, file name is “sampledataForDuplicate” and local path is “/home/NN/HadoopRepo/MapReduce/resources/recordCount”.
We need to move this data file to the HDFS location which will be the input path for the map reduce.
hadoop fs -put /home/NN/HadoopRepo/MapReduce/resources/recordCount /user/bdp/mapreduce
It will copy the data file from local to HDFS location.
Step 2: Create Maven Project
In order to write map reduce program, create a maven project. We will add the dependency for the required package.
Follow the below steps in order to create the maven project:
- Open Eclipse
- Create a maven project
Step 3: Resolve Dependency
Add below dependency in pom.XML file and resolve dependency using cmd:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> <!-- Hadoop Mapreduce Client Core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>${java.version}</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency>
Step 4: Write Mapper
Once you are done with all above steps, write a mapper class which will take an input file. It will read the file and store each word of the file with key-value pair. Here, using a java program to write the mapper.
package com.bdp.mapreduce.recordcount.mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class RecordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text record = new Text("Record"); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub // Directly writing values which is nothing but a record if (key.get() == 0 && value.toString().contains("first_name")) { return; } else context.write(record, one); } }
Step 5: Write Reducer
In this step, we will take the mapper data as an input and process it. The actual logic has been written here.
Find the code below:
package com.bdp.mapreduce.recordcount.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RecordCountReducer extends Reducer<Text, IntWritable, NullWritable, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, NullWritable, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int recordCount = 0; for (IntWritable value : values) { recordCount += value.get(); } context.write( NullWritable.get(), new IntWritable(recordCount)); } }
Step 6: Write Driver
In order to execute the mapper and reducer, let’s create a driver class which will call mapper and reducer. Find below the driver class code:
package com.bdp.mapreduce.recordcount.driver; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.bdp.mapreduce.recordcount.mapper.RecordCountMapper; import com.bdp.mapreduce.recordcount.reducer.RecordCountReducer; public class RecordCountDriver extends Configured implements Tool{ public int run(String[] args) throws Exception { // TODO Auto-generated method stub @SuppressWarnings("deprecation") Job job = new Job(getConf(), "Record Count"); job.setJarByClass(getClass()); job.setMapperClass(RecordCountMapper.class); job.setReducerClass(RecordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int jobStatus = ToolRunner.run(new RecordCountDriver(), args); System.out.println(jobStatus); } }
Step 7: Package Preparation
In this step, we will create a package(.jar) of the project. Follow the below steps:
- Open CMD
- Head to the maven project
- Use the command: mvn package
You can also download the package which I have built for this requirement:
MapReduceForRecordCount-0.0.1-SNAPSHOTStep 8: Execution
All the setup has been done. Let’s execute the job and validate the output. In order to execute the map reduce, use below command:
Format: hadoop jar <path of jar> <driver class with package name> <input data path of HDFS> <output path at HDFS>
Ex:
hadoop jar /home/NN/HadoopRepo/MapReduce/MapReduceForRecordCount-0.0.1-SNAPSHOT.jar com.bdp.mapreduce.recordcount.driver.RecordCountDriver /user/bdp/mapreduce/recordCount /user/bdp/mapreduce/out/recordCount
Step 9: Validate Output
Check the output at HDFS path.
[root@NN hadoop-2.6.0]# hadoop fs -ls /user/bdp/mapreduce/out/recordCount 17/08/01 22:04:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 1 root supergroup 0 2017-08-01 22:04 /user/bdp/mapreduce/out/recordCount/_SUCCESS -rw-r--r-- 1 root supergroup 3 2017-08-01 22:04 /user/bdp/mapreduce/out/recordCount/part-r-00000
[root@NN hadoop-2.6.0]# hadoop fs -cat /user/bdp/mapreduce/out/recordCount/part-r-00000 17/08/01 22:05:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16
Wrapping Up
In this post, we have written a map-reduce application to count the number of records of the data file(s). This is similar to SQL count(*) command. Here, this MapReduce job contains three classes map, reduce and driver. Mapper reads the input, Reducer counts the records and Driver sets all the configuration to run the map reduce.