Writing a MapReduce program from scratch (using Hadoop) involves understanding a few key concepts, classes, and structure. Itβs not like regular Java developmentβyouβre building distributed data processing logic that runs on multiple machines.
To write a simple Hadoop MapReduce program from scratch, you should understand these essentials:
- Mapper: Processes input data line by line and outputs key-value pairs.
- Reducer: Aggregates all values by key and outputs final key-value pairs.
- Combiner (optional): A mini-reducer run on mapper output before it's sent across the network.
- Driver: Java class that sets up and runs the job.
Think of it like:
Input Text File β [Mapper β Shuffle & Sort β Reducer] β Output File
You should know:
- Classes and objects
- Inheritance and method overriding
- File I/O
- Java generics (for Hadoop types like
Text
,IntWritable
) - Basic understanding of how to compile and run Java with dependencies
Role | Class Name |
---|---|
Mapper | org.apache.hadoop.mapreduce.Mapper |
Reducer | org.apache.hadoop.mapreduce.Reducer |
Writable | IntWritable , Text , etc. |
Job Control | org.apache.hadoop.mapreduce.Job |
Hereβs the simplest version of a MapReduce Word Count job:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().replaceAll("[^a-zA-Z]", "").toLowerCase());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // optional, improves performance
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0])); // input file path
FileOutputFormat.setOutputPath(job, new Path(args[1])); // output directory
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Youβll need to compile with Hadoop dependencies. If you're not using Maven, something like:
javac -classpath $(hadoop classpath) -d . WordCount.java
jar cf wordcount.jar WordCount*.class
hadoop jar wordcount.jar WordCount /input/path /output/path
Step | What to Do |
---|---|
1 | Write Mapper class |
2 | Write Reducer class |
3 | Write main() method to set up the job |
4 | Compile it with Hadoop dependencies |
5 | Package into a JAR file |
6 | Run it using hadoop jar on your cluster |
- Learn about combiners, partitioners, and custom comparators
- Use multiple reducers
- Handle large datasets and failures
- Look into Apache Spark, which is easier and more efficient than raw MapReduce
Let me know if you want a ready-to-run example you can test locally (with input/output files), or if you're using a cluster like AWS EMR or a local Hadoop setup.
Great question! The generic types in extends Mapper<...>
and extends Reducer<...>
are NOT fixed β they are fully customizable based on your data.
Mapper<INPUT_KEY, INPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>
Reducer<INPUT_KEY, INPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>
You define these types based on:
- What kind of data is being read
- What kind of key/value you want to emit
Mapper<Object, Text, Text, IntWritable>
Type | Meaning |
---|---|
Object |
Input key (offset of the line, not used) |
Text |
Input value (entire line of text) |
Text |
Output key (each word) |
IntWritable |
Output value (count = 1) |
Reducer<Text, IntWritable, Text, IntWritable>
Type | Meaning |
---|---|
Text |
Input key (a word) |
IntWritable |
Input values (list of 1s) |
Text |
Output key (word) |
IntWritable |
Output value (total count) |
Ask yourself these questions:
Default: Line-by-line from text file
- Then:
Mapper<Object, Text, ..., ...>
Example:
- Counting visits per IP β
Mapper<Object, Text, Text (IP), IntWritable (1)>
- Sales sum β
Mapper<Object, Text, Text (ProductID), FloatWritable (Price)>
-
It receives all values grouped by key
-
Output can be anything:
Text
,IntWritable
,FloatWritable
,NullWritable
, etc.
Assume CSV: productId,price
Mapper<Object, Text, Text, FloatWritable>
Reducer<Text, FloatWritable, Text, FloatWritable>
- Mapper emits:
<productId, price>
- Reducer receives:
<productId, list of prices>
β sums β emits total sales
Java Type | Hadoop Type |
---|---|
int |
IntWritable |
long |
LongWritable |
float |
FloatWritable |
double |
DoubleWritable |
String |
Text |
null | NullWritable |
When unsure, print debug output inside map()
and reduce()
to test what keys and values are flowing.
Would you like a small practice challenge to try customizing this?
| | |
β the method signatures for map()
and reduce()
must have exactly three parameters:
map(KEYIN key, VALUEIN value, Context context)
KEYIN
: Input key (e.g., line offset)VALUEIN
: Input value (e.g., line of text)Context
: Used to emit output (context.write(...)
)
π‘ You must not change the number of parameters. The Context
is required for Hadoop to interact with the mapper's output.
reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
KEYIN
: The grouped key (e.g., a word)Iterable<VALUEIN>
: All values for that key (e.g., [1, 1, 1])Context
: Used to emit reduced output
Again, fixed three parameters β this is what Hadoop internally expects.
map(Text a, Text b)
β β won't compile or runreduce(String key, List<IntWritable> vals)
β β invalid types + missing context
You choose the generic types in:
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
β¦but the map()
and reduce()
method must follow the three-parameter structure.
Let me know if you want a template to follow for any custom logic (e.g., sales, weather data, etc.).
|---|
Excellent β now we're digging into the heart of the Reducer logic, where Hadoop actually processes those "arrays" (really, Iterable<IntWritable>
) that it grouped from the Mapper's output.
Letβs walk through it clearly.
public static class AvgWordLengthReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private int totalLength = 0;
private int totalCount = 0;
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get(); // <- This is where the addition happens
}
if (key.toString().equals("length")) {
totalLength = sum;
} else if (key.toString().equals("count")) {
totalCount = sum;
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (totalCount > 0) {
int avg = totalLength / totalCount;
context.write(new Text("Average Word Length"), new IntWritable(avg));
}
}
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
-
Hadoop guarantees that all values with the same key are grouped and passed together.
-
You can think of
values
as an array:- If
key == "length"
βvalues = [4, 5, 6, 7, ...]
- If
key == "count"
βvalues = [1, 1, 1, 1, ...]
- If
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
- This sums up the elements of the "array"
- Once the sum is computed, we store it in a class variable
if (key.toString().equals("length")) {
totalLength = sum;
} else if (key.toString().equals("count")) {
totalCount = sum;
}
- So if the Reducer is handling
"length"
key group, it setstotalLength
- If it's the
"count"
key group, it setstotalCount
int avg = totalLength / totalCount;
context.write(new Text("Average Word Length"), new IntWritable(avg));
- This happens after all
reduce()
calls are complete - It's used when you want to output something after aggregating multiple keys
Role | Code | Description |
---|---|---|
Handle grouped values | for (IntWritable val : values) |
Iterate like an array |
Sum values | sum += val.get(); |
Adds up word lengths or counts |
Track totals by key type | if (key.equals(...)) |
Distinguish between "length" and "count" |
Output after all reductions | cleanup() method |
Outputs final average |
Mapper output:
("length", 4)
("count", 1)
("length", 5)
("count", 1)
("length", 6)
("count", 1)
Reducer receives:
"length" β [4, 5, 6] β sum = 15
"count" β [1, 1, 1] β sum = 3
Then:
avg = 15 / 3 = 5
Output:
("Average Word Length", 5)
the Reducer's reduce()
method runs once per key, not once per all keys combined.
- Hadoop groups the mapper outputs by key.
- For each unique key, Hadoop calls
reduce(key, Iterable<values>, context)
once. - So, the for-loop inside
reduce()
processes only the values for that one key.
- First, Hadoop calls:
reduce("count", Iterable of all count values, context)
The for-loop sums all the counts (like 1 + 1 + 1 + ...), then you assign:
totalCount = sum;
- Then, Hadoop calls:
reduce("length", Iterable of all length values, context)
The for-loop sums all the lengths (like 4 + 5 + 6 + ...), then you assign:
totalLength = sum;
Call # | Key | Values | What happens inside reduce() |
---|---|---|---|
1 | "count" | [1, 1, 1, 1, ...] | Sum all counts and set totalCount |
2 | "length" | [4, 5, 6, 7, ...] | Sum all lengths and set totalLength |
if (key.toString().equals("length")) {
totalLength = sum;
} else if (key.toString().equals("count")) {
totalCount = sum;
}
happens after the summation for that key only.
the cleanup()
method runs β which uses the stored totalLength
and totalCount
to calculate the average and write the final output.
reduce("count", [...])
β sums counts β setstotalCount
reduce("length", [...])
β sums lengths β setstotalLength
cleanup()
β calculates and outputs average
|------| | | | |
Absolutely! Hereβs a generalized format/template you can use for any Hadoop MapReduce job driver class. Just plug in your specific Mapper, Reducer, and data types.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyJobDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MyJobDriver <input path> <output path>");
System.exit(-1);
}
// Create configuration and job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Job Name Here");
// Set the main class
job.setJarByClass(MyJobDriver.class);
// Set Mapper and Reducer classes
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// (Optional) Set Combiner class if any
// job.setCombinerClass(MyCombiner.class);
// Set Mapper output key and value types
job.setMapOutputKeyClass(MyMapperOutputKeyClass.class);
job.setMapOutputValueClass(MyMapperOutputValueClass.class);
// Set Reducer (final output) key and value types
job.setOutputKeyClass(MyReducerOutputKeyClass.class);
job.setOutputValueClass(MyReducerOutputValueClass.class);
// Set input and output paths from args
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Submit job and wait for completion
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- Replace
MyMapper.class
with your Mapper class name. - Replace
MyReducer.class
with your Reducer class name. - Replace
MyMapperOutputKeyClass.class
,MyMapperOutputValueClass.class
with the key/value output types from the Mapper. - Replace
MyReducerOutputKeyClass.class
,MyReducerOutputValueClass.class
with the final output key/value types from the Reducer. - Change
"Job Name Here"
to a descriptive job name. - Pass the input and output directories as command-line arguments when running the job.
job.setMapperClass(AvgWordLengthMapper.class);
job.setReducerClass(AvgWordLengthReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Hereβs a list of the most commonly used Hadoop and HDFS commands, grouped by category, along with short descriptions.
Command | Description |
---|---|
hdfs dfs -ls /path |
List files/directories in HDFS path |
hdfs dfs -mkdir /path |
Create directory in HDFS |
hdfs dfs -put localfile /hdfs/path |
Upload local file to HDFS |
hdfs dfs -copyFromLocal localfile /hdfs/path |
Same as -put |
hdfs dfs -get /hdfs/file localpath |
Download HDFS file to local |
hdfs dfs -copyToLocal /hdfs/file localpath |
Same as -get |
hdfs dfs -cat /hdfs/file |
Display contents of HDFS file |
hdfs dfs -tail /hdfs/file |
Show last KB of a file |
hdfs dfs -rm /hdfs/file |
Delete HDFS file |
hdfs dfs -rm -r /hdfs/dir |
Recursively delete directory |
hdfs dfs -du -h /path |
Show space used by files (human readable) |
hdfs dfs -df -h |
Show HDFS disk usage info |
hdfs dfs -stat /hdfs/file |
Show file metadata (size, mod time, etc.) |
Command | Description |
---|---|
hdfs dfs -chmod 755 /file |
Change file permissions |
hdfs dfs -chown user:group /file |
Change file owner and group |
hdfs dfs -mv /src /dest |
Move/rename file in HDFS |
hdfs dfs -cp /src /dest |
Copy file in HDFS |
Command | Description |
---|---|
hdfs fsck /path |
Check file system health |
hdfs dfsadmin -report |
Summary of data nodes and storage |
hdfs dfsadmin -safemode get |
Check if NameNode is in safe mode |
hdfs dfsadmin -safemode leave |
Exit safe mode |
Command | Description |
---|---|
hadoop version |
Show Hadoop version |
hdfs dfs -help |
Get help for HDFS commands |
yarn top |
Monitor YARN container usage (if available) |