Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
—
Base classes and utilities for implementing Avro-aware mappers and reducers in the legacy MapReduce API. This framework provides abstract base classes that handle Avro data serialization and deserialization automatically, allowing developers to focus on business logic while maintaining type safety and schema evolution support.
Abstract base class for implementing mappers that process Avro data in the legacy MapReduce API.
public abstract class AvroMapper<IN,OUT> extends Configured implements JobConfigurable, Closeable {
// Abstract method to implement business logic
public abstract void map(IN datum, AvroCollector<OUT> collector, Reporter reporter)
throws IOException;
// Lifecycle methods (from interfaces)
public void configure(JobConf jobConf);
public void close() throws IOException;
}import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.mapred.Reporter;
// Implement mapper for processing user records
public class UserMapper extends AvroMapper<GenericRecord, GenericRecord> {
@Override
public void map(GenericRecord user, AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
// Access Avro data directly (no wrapper needed)
String name = user.get("name").toString();
Integer age = (Integer) user.get("age");
// Filter and transform data
if (age >= 18) {
GenericRecord output = new GenericRecordBuilder(outputSchema)
.set("name", name.toUpperCase())
.set("age", age)
.set("category", "adult")
.build();
// Collect output (automatically wrapped)
collector.collect(output);
}
// Update counters
reporter.incrCounter("USER_PROCESSING", "TOTAL_USERS", 1);
if (age >= 18) {
reporter.incrCounter("USER_PROCESSING", "ADULT_USERS", 1);
}
}
@Override
public void configure(JobConf jobConf) {
// Initialize mapper with job configuration
this.outputSchema = AvroJob.getMapOutputSchema(jobConf);
}
private Schema outputSchema;
}Abstract base class for implementing reducers that process grouped Avro data.
public abstract class AvroReducer<K,V,OUT> extends Configured implements JobConfigurable, Closeable {
// Abstract method for business logic
public abstract void reduce(K key, Iterable<V> values, AvroCollector<OUT> collector, Reporter reporter)
throws IOException;
// Lifecycle methods (from interfaces)
public void configure(JobConf jobConf);
public void close() throws IOException;
}import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.mapred.Reporter;
// Implement reducer for aggregating user data by department
public class UserAggregateReducer extends AvroReducer<CharSequence, GenericRecord, GenericRecord> {
@Override
public void reduce(CharSequence department, Iterable<GenericRecord> users,
AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
int totalUsers = 0;
int totalAge = 0;
List<String> userNames = new ArrayList<>();
// Process all users in this department
for (GenericRecord user : users) {
totalUsers++;
totalAge += (Integer) user.get("age");
userNames.add(user.get("name").toString());
}
// Create aggregated output
GenericRecord summary = new GenericRecordBuilder(outputSchema)
.set("department", department.toString())
.set("user_count", totalUsers)
.set("average_age", totalAge / totalUsers)
.set("user_names", userNames)
.build();
collector.collect(summary);
// Update counters
reporter.incrCounter("AGGREGATION", "DEPARTMENTS_PROCESSED", 1);
reporter.incrCounter("AGGREGATION", "USERS_AGGREGATED", totalUsers);
}
@Override
public void configure(JobConf jobConf) {
this.outputSchema = AvroJob.getOutputSchema(jobConf);
}
private Schema outputSchema;
}Abstract collector interface for gathering output from mappers and reducers.
public abstract class AvroCollector<T> extends Configured {
// Core collection method
public abstract void collect(T datum) throws IOException;
}The framework provides concrete implementations that automatically wrap collected data in AvroWrapper objects for integration with Hadoop's MapReduce infrastructure.
// In mapper or reducer
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) {
// Process input
GenericRecord output = processRecord(input);
// Collect output - automatically wrapped for Hadoop
collector.collect(output);
// Multiple outputs are supported
if (shouldEmitSummary(input)) {
GenericRecord summary = createSummary(input);
collector.collect(summary);
}
}Classes that bridge Avro processing with standard Hadoop mappers and reducers.
public abstract class HadoopMapper<IN,OUT> extends AvroMapper<IN,OUT> {
// Integration with standard Hadoop Mapper interface
}
public abstract class HadoopReducer<K,V,OUT> extends AvroReducer<K,V,OUT> {
// Integration with standard Hadoop Reducer interface
}
public abstract class HadoopCombiner<K,V,OUT> extends AvroReducer<K,V,OUT> {
// Specialized for combine operations
}
public abstract class HadoopReducerBase<K,V,OUT> extends AvroReducer<K,V,OUT> {
// Base class with additional Hadoop-specific functionality
}Specific collector implementation for map phase output.
public class MapCollector<T> extends AvroCollector<T> {
// Constructor
public MapCollector(OutputCollector<AvroWrapper<T>, NullWritable> collector);
// Collect implementation
public void collect(T datum) throws IOException;
}Here's a complete example showing mapper and reducer implementation:
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.Pair;
import org.apache.avro.util.Utf8;
public class WordCountMapper extends AvroMapper<Utf8, Pair<Utf8, Integer>> {
@Override
public void map(Utf8 line, AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter)
throws IOException {
// Split line into words
String[] words = line.toString().toLowerCase().split("\\W+");
// Emit each word with count of 1
for (String word : words) {
if (!word.isEmpty()) {
Pair<Utf8, Integer> pair = new Pair<>(new Utf8(word), 1);
collector.collect(pair);
}
}
reporter.incrCounter("WORDS", "LINES_PROCESSED", 1);
reporter.incrCounter("WORDS", "WORDS_EMITTED", words.length);
}
}import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.Pair;
import org.apache.avro.util.Utf8;
public class WordCountReducer extends AvroReducer<Utf8, Integer, Pair<Utf8, Integer>> {
@Override
public void reduce(Utf8 word, Iterable<Integer> counts,
AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter)
throws IOException {
// Sum all counts for this word
int totalCount = 0;
for (Integer count : counts) {
totalCount += count;
}
// Emit word with total count
Pair<Utf8, Integer> result = new Pair<>(word, totalCount);
collector.collect(result);
reporter.incrCounter("WORDS", "UNIQUE_WORDS", 1);
}
}import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.hadoop.mapred.JobConf;
// Configure word count job
JobConf job = new JobConf();
job.setJobName("Avro Word Count");
// Set input/output formats
job.setInputFormat(AvroInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);
// Configure schemas
Schema stringSchema = Schema.create(Schema.Type.STRING);
Schema pairSchema = Pair.getPairSchema(stringSchema, Schema.create(Schema.Type.INT));
AvroJob.setInputSchema(job, stringSchema);
AvroJob.setMapOutputSchema(job, pairSchema);
AvroJob.setOutputSchema(job, pairSchema);
// Set mapper and reducer classes
AvroJob.setMapperClass(job, WordCountMapper.class);
AvroJob.setReducerClass(job, WordCountReducer.class);
// Set input/output paths
FileInputFormat.setInputPaths(job, new Path("/input"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
// Run job
JobClient.runJob(job);public class ConfigurableMapper extends AvroMapper<GenericRecord, GenericRecord> {
private Schema outputSchema;
private String filterField;
private Object filterValue;
@Override
public void configure(JobConf jobConf) {
// Get schemas and configuration
this.outputSchema = AvroJob.getMapOutputSchema(jobConf);
this.filterField = jobConf.get("filter.field");
this.filterValue = jobConf.get("filter.value");
}
@Override
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
// Use configuration in processing
if (input.get(filterField).equals(filterValue)) {
collector.collect(transformRecord(input));
}
}
}public class MultiOutputMapper extends AvroMapper<GenericRecord, GenericRecord> {
@Override
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
String recordType = input.get("type").toString();
switch (recordType) {
case "user":
collector.collect(processUser(input));
break;
case "event":
collector.collect(processEvent(input));
break;
case "transaction":
collector.collect(processTransaction(input));
break;
}
}
}public class RobustMapper extends AvroMapper<GenericRecord, GenericRecord> {
@Override
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
try {
// Process record
GenericRecord output = processRecord(input);
collector.collect(output);
reporter.incrCounter("PROCESSING", "SUCCESS", 1);
} catch (Exception e) {
// Log error and continue processing
System.err.println("Failed to process record: " + input + ", error: " + e.getMessage());
reporter.incrCounter("PROCESSING", "ERRORS", 1);
// Optionally emit error record
GenericRecord errorRecord = createErrorRecord(input, e);
collector.collect(errorRecord);
}
}
}public class EfficientMapper extends AvroMapper<GenericRecord, GenericRecord> {
private GenericRecord reusableOutput;
@Override
public void configure(JobConf jobConf) {
Schema outputSchema = AvroJob.getMapOutputSchema(jobConf);
this.reusableOutput = new GenericData.Record(outputSchema);
}
@Override
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
// Reuse output object to reduce GC pressure
reusableOutput.put("field1", input.get("field1"));
reusableOutput.put("field2", processField(input.get("field2")));
collector.collect(reusableOutput);
}
}public class MemoryEfficientReducer extends AvroReducer<Utf8, GenericRecord, GenericRecord> {
@Override
public void reduce(Utf8 key, Iterable<GenericRecord> values,
AvroCollector<GenericRecord> collector, Reporter reporter)
throws IOException {
// Process values in streaming fashion to avoid loading all into memory
int count = 0;
GenericRecord first = null;
for (GenericRecord value : values) {
if (first == null) {
first = GenericData.get().deepCopy(value.getSchema(), value);
}
count++;
// Process without accumulating
if (count % 1000 == 0) {
// Periodically report progress
reporter.progress();
}
}
// Create summary without holding all values
GenericRecord summary = createSummary(key, first, count);
collector.collect(summary);
}
}Common issues and solutions:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-mapred