CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-mapred

Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines

Pending
Overview
Eval results
Files

mapreduce-processing.mddocs/

MapReduce Processing Framework

Base classes and utilities for implementing Avro-aware mappers and reducers in the legacy MapReduce API. This framework provides abstract base classes that handle Avro data serialization and deserialization automatically, allowing developers to focus on business logic while maintaining type safety and schema evolution support.

Capabilities

Avro Mapper Base Class

Abstract base class for implementing mappers that process Avro data in the legacy MapReduce API.

public abstract class AvroMapper<IN,OUT> extends Configured implements JobConfigurable, Closeable {
    // Abstract method to implement business logic
    public abstract void map(IN datum, AvroCollector<OUT> collector, Reporter reporter) 
        throws IOException;
    
    // Lifecycle methods (from interfaces)
    public void configure(JobConf jobConf);
    public void close() throws IOException;
}

Usage Example

import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.mapred.Reporter;

// Implement mapper for processing user records
public class UserMapper extends AvroMapper<GenericRecord, GenericRecord> {
    
    @Override
    public void map(GenericRecord user, AvroCollector<GenericRecord> collector, Reporter reporter) 
            throws IOException {
        
        // Access Avro data directly (no wrapper needed)
        String name = user.get("name").toString();
        Integer age = (Integer) user.get("age");
        
        // Filter and transform data
        if (age >= 18) {
            GenericRecord output = new GenericRecordBuilder(outputSchema)
                .set("name", name.toUpperCase())
                .set("age", age)
                .set("category", "adult")
                .build();
            
            // Collect output (automatically wrapped)
            collector.collect(output);
        }
        
        // Update counters
        reporter.incrCounter("USER_PROCESSING", "TOTAL_USERS", 1);
        if (age >= 18) {
            reporter.incrCounter("USER_PROCESSING", "ADULT_USERS", 1);
        }
    }
    
    @Override
    public void configure(JobConf jobConf) {
        // Initialize mapper with job configuration
        this.outputSchema = AvroJob.getMapOutputSchema(jobConf);
    }
    
    private Schema outputSchema;
}

Avro Reducer Base Class

Abstract base class for implementing reducers that process grouped Avro data.

public abstract class AvroReducer<K,V,OUT> extends Configured implements JobConfigurable, Closeable {
    // Abstract method for business logic
    public abstract void reduce(K key, Iterable<V> values, AvroCollector<OUT> collector, Reporter reporter) 
        throws IOException;
    
    // Lifecycle methods (from interfaces)
    public void configure(JobConf jobConf);
    public void close() throws IOException;
}

Usage Example

import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.mapred.Reporter;

// Implement reducer for aggregating user data by department
public class UserAggregateReducer extends AvroReducer<CharSequence, GenericRecord, GenericRecord> {
    
    @Override
    public void reduce(CharSequence department, Iterable<GenericRecord> users, 
                      AvroCollector<GenericRecord> collector, Reporter reporter) 
            throws IOException {
        
        int totalUsers = 0;
        int totalAge = 0;
        List<String> userNames = new ArrayList<>();
        
        // Process all users in this department
        for (GenericRecord user : users) {
            totalUsers++;
            totalAge += (Integer) user.get("age");
            userNames.add(user.get("name").toString());
        }
        
        // Create aggregated output
        GenericRecord summary = new GenericRecordBuilder(outputSchema)
            .set("department", department.toString())
            .set("user_count", totalUsers)
            .set("average_age", totalAge / totalUsers)
            .set("user_names", userNames)
            .build();
        
        collector.collect(summary);
        
        // Update counters
        reporter.incrCounter("AGGREGATION", "DEPARTMENTS_PROCESSED", 1);
        reporter.incrCounter("AGGREGATION", "USERS_AGGREGATED", totalUsers);
    }
    
    @Override
    public void configure(JobConf jobConf) {
        this.outputSchema = AvroJob.getOutputSchema(jobConf);
    }
    
    private Schema outputSchema;
}

Avro Collector

Abstract collector interface for gathering output from mappers and reducers.

public abstract class AvroCollector<T> extends Configured {
    // Core collection method
    public abstract void collect(T datum) throws IOException;
}

The framework provides concrete implementations that automatically wrap collected data in AvroWrapper objects for integration with Hadoop's MapReduce infrastructure.

Usage Example

// In mapper or reducer
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) {
    // Process input
    GenericRecord output = processRecord(input);
    
    // Collect output - automatically wrapped for Hadoop
    collector.collect(output);
    
    // Multiple outputs are supported
    if (shouldEmitSummary(input)) {
        GenericRecord summary = createSummary(input);
        collector.collect(summary);
    }
}

Hadoop Integration Classes

Classes that bridge Avro processing with standard Hadoop mappers and reducers.

public abstract class HadoopMapper<IN,OUT> extends AvroMapper<IN,OUT> {
    // Integration with standard Hadoop Mapper interface
}

public abstract class HadoopReducer<K,V,OUT> extends AvroReducer<K,V,OUT> {
    // Integration with standard Hadoop Reducer interface
}

public abstract class HadoopCombiner<K,V,OUT> extends AvroReducer<K,V,OUT> {
    // Specialized for combine operations
}

public abstract class HadoopReducerBase<K,V,OUT> extends AvroReducer<K,V,OUT> {
    // Base class with additional Hadoop-specific functionality
}

Map Collector Implementation

Specific collector implementation for map phase output.

public class MapCollector<T> extends AvroCollector<T> {
    // Constructor
    public MapCollector(OutputCollector<AvroWrapper<T>, NullWritable> collector);
    
    // Collect implementation
    public void collect(T datum) throws IOException;
}

Complete Example: Word Count

Here's a complete example showing mapper and reducer implementation:

Word Count Mapper

import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.Pair;
import org.apache.avro.util.Utf8;

public class WordCountMapper extends AvroMapper<Utf8, Pair<Utf8, Integer>> {
    
    @Override
    public void map(Utf8 line, AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter) 
            throws IOException {
        
        // Split line into words
        String[] words = line.toString().toLowerCase().split("\\W+");
        
        // Emit each word with count of 1
        for (String word : words) {
            if (!word.isEmpty()) {
                Pair<Utf8, Integer> pair = new Pair<>(new Utf8(word), 1);
                collector.collect(pair);
            }
        }
        
        reporter.incrCounter("WORDS", "LINES_PROCESSED", 1);
        reporter.incrCounter("WORDS", "WORDS_EMITTED", words.length);
    }
}

Word Count Reducer

import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.Pair;
import org.apache.avro.util.Utf8;

public class WordCountReducer extends AvroReducer<Utf8, Integer, Pair<Utf8, Integer>> {
    
    @Override
    public void reduce(Utf8 word, Iterable<Integer> counts, 
                      AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter) 
            throws IOException {
        
        // Sum all counts for this word
        int totalCount = 0;
        for (Integer count : counts) {
            totalCount += count;
        }
        
        // Emit word with total count
        Pair<Utf8, Integer> result = new Pair<>(word, totalCount);
        collector.collect(result);
        
        reporter.incrCounter("WORDS", "UNIQUE_WORDS", 1);
    }
}

Job Configuration

import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.hadoop.mapred.JobConf;

// Configure word count job
JobConf job = new JobConf();
job.setJobName("Avro Word Count");

// Set input/output formats
job.setInputFormat(AvroInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);

// Configure schemas
Schema stringSchema = Schema.create(Schema.Type.STRING);
Schema pairSchema = Pair.getPairSchema(stringSchema, Schema.create(Schema.Type.INT));

AvroJob.setInputSchema(job, stringSchema);
AvroJob.setMapOutputSchema(job, pairSchema);
AvroJob.setOutputSchema(job, pairSchema);

// Set mapper and reducer classes
AvroJob.setMapperClass(job, WordCountMapper.class);
AvroJob.setReducerClass(job, WordCountReducer.class);

// Set input/output paths
FileInputFormat.setInputPaths(job, new Path("/input"));
FileOutputFormat.setOutputPath(job, new Path("/output"));

// Run job
JobClient.runJob(job);

Advanced Patterns

Custom Initialization

public class ConfigurableMapper extends AvroMapper<GenericRecord, GenericRecord> {
    private Schema outputSchema;
    private String filterField;
    private Object filterValue;
    
    @Override
    public void configure(JobConf jobConf) {
        // Get schemas and configuration
        this.outputSchema = AvroJob.getMapOutputSchema(jobConf);
        this.filterField = jobConf.get("filter.field");
        this.filterValue = jobConf.get("filter.value");
    }
    
    @Override
    public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) 
            throws IOException {
        
        // Use configuration in processing
        if (input.get(filterField).equals(filterValue)) {
            collector.collect(transformRecord(input));
        }
    }
}

Multiple Output Types

public class MultiOutputMapper extends AvroMapper<GenericRecord, GenericRecord> {
    
    @Override
    public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) 
            throws IOException {
        
        String recordType = input.get("type").toString();
        
        switch (recordType) {
            case "user":
                collector.collect(processUser(input));
                break;
            case "event":
                collector.collect(processEvent(input));
                break;
            case "transaction":
                collector.collect(processTransaction(input));
                break;
        }
    }
}

Error Handling

public class RobustMapper extends AvroMapper<GenericRecord, GenericRecord> {
    
    @Override
    public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) 
            throws IOException {
        
        try {
            // Process record
            GenericRecord output = processRecord(input);
            collector.collect(output);
            
            reporter.incrCounter("PROCESSING", "SUCCESS", 1);
            
        } catch (Exception e) {
            // Log error and continue processing
            System.err.println("Failed to process record: " + input + ", error: " + e.getMessage());
            reporter.incrCounter("PROCESSING", "ERRORS", 1);
            
            // Optionally emit error record
            GenericRecord errorRecord = createErrorRecord(input, e);
            collector.collect(errorRecord);
        }
    }
}

Performance Considerations

Object Reuse

public class EfficientMapper extends AvroMapper<GenericRecord, GenericRecord> {
    private GenericRecord reusableOutput;
    
    @Override
    public void configure(JobConf jobConf) {
        Schema outputSchema = AvroJob.getMapOutputSchema(jobConf);
        this.reusableOutput = new GenericData.Record(outputSchema);
    }
    
    @Override
    public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) 
            throws IOException {
        
        // Reuse output object to reduce GC pressure
        reusableOutput.put("field1", input.get("field1"));
        reusableOutput.put("field2", processField(input.get("field2")));
        
        collector.collect(reusableOutput);
    }
}

Memory Management

public class MemoryEfficientReducer extends AvroReducer<Utf8, GenericRecord, GenericRecord> {
    
    @Override
    public void reduce(Utf8 key, Iterable<GenericRecord> values, 
                      AvroCollector<GenericRecord> collector, Reporter reporter) 
            throws IOException {
        
        // Process values in streaming fashion to avoid loading all into memory
        int count = 0;
        GenericRecord first = null;
        
        for (GenericRecord value : values) {
            if (first == null) {
                first = GenericData.get().deepCopy(value.getSchema(), value);
            }
            count++;
            
            // Process without accumulating
            if (count % 1000 == 0) {
                // Periodically report progress
                reporter.progress();
            }
        }
        
        // Create summary without holding all values
        GenericRecord summary = createSummary(key, first, count);
        collector.collect(summary);
    }
}

Error Handling

Common issues and solutions:

  • Schema Mismatch: Ensure input/output schemas are properly configured via AvroJob
  • NullPointerException: Check for null values in Avro records before processing
  • ClassCastException: Verify data types match schema expectations
  • Configuration Errors: Ensure mapper/reducer classes are properly registered with AvroJob
  • Memory Issues: Use object reuse patterns and streaming processing for large datasets

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-mapred

docs

cross-language-processing.md

data-wrappers.md

file-utilities.md

index.md

input-output-formats.md

job-configuration.md

mapreduce-processing.md

serialization.md

tile.json