CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-mapred

Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines

Pending
Overview
Eval results
Files

input-output-formats.mddocs/

Input and Output Formats

Specialized InputFormat and OutputFormat implementations for reading and writing Avro data in various formats within MapReduce jobs. These formats provide seamless integration between Avro's schema-based serialization and Hadoop's distributed file system, supporting both legacy and modern MapReduce APIs.

Capabilities

Legacy API Input Formats

Input formats for the legacy org.apache.hadoop.mapred API that read Avro data and present it as wrapped objects.

public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
    public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(
        InputSplit split, JobConf job, Reporter reporter) throws IOException;
    
    // Configuration constants
    public static final String IGNORE_FILES_WITHOUT_EXTENSION_KEY = "avro.mapred.ignore.inputs.without.extension";
    public static final boolean IGNORE_INPUTS_WITHOUT_EXTENSION_DEFAULT = true;
}

public class AvroAsTextInputFormat<T> extends AvroInputFormat<T> {
    // Reads Avro data but presents as text representation
}

public class AvroUtf8InputFormat extends AvroInputFormat<Utf8> {
    // Specialized for reading Avro UTF-8 string data
}

Usage Example

import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroJob;
import org.apache.hadoop.mapred.JobConf;

// Configure job for Avro input
JobConf job = new JobConf();
job.setInputFormat(AvroInputFormat.class);
AvroJob.setInputSchema(job, userSchema);

// Input format will read Avro container files and produce AvroWrapper<T> keys

Legacy API Output Formats

Output formats for writing Avro data from MapReduce jobs using the legacy API.

public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
    public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
        FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException;
}

public class AvroTextOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
    // Writes Avro data as text representation
}

Usage Example

import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroJob;

// Configure job for Avro output
job.setOutputFormat(AvroOutputFormat.class);
AvroJob.setOutputSchema(job, outputSchema);
AvroJob.setOutputCodec(job, "snappy");

// Output format will write AvroWrapper<T> data as Avro container files

New API Input Formats

Input formats for the modern org.apache.hadoop.mapreduce API with enhanced key-value separation.

public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
    public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
    public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

public class CombineAvroKeyValueFileInputFormat<K,V> extends CombineFileInputFormat<AvroKey<K>, AvroValue<V>> {
    // Optimized for processing many small Avro files
    public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException;
}

Usage Example

import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueInputFormat;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.mapreduce.Job;

// Single key input format
Job job = Job.getInstance();
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, keySchema);

// Key-value input format
job.setInputFormatClass(AvroKeyValueInputFormat.class);
AvroJob.setInputKeySchema(job, keySchema);
AvroJob.setInputValueSchema(job, valueSchema);

// For many small files
job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);

New API Output Formats

Output formats for writing Avro data using the modern MapReduce API.

public class AvroKeyOutputFormat<T> extends FileOutputFormat<AvroKey<T>, NullWritable> {
    public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context) 
        throws IOException, InterruptedException;
}

public class AvroKeyValueOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
    public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context) 
        throws IOException, InterruptedException;
}

public abstract class AvroOutputFormatBase<K,V> extends FileOutputFormat<K,V> {
    // Base class providing common functionality for Avro output formats
    protected static class AvroRecordWriter<K,V> extends RecordWriter<K,V> {
        public void write(K key, V value) throws IOException, InterruptedException;
        public void close(TaskAttemptContext context) throws IOException, InterruptedException;
    }
}

Usage Example

import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.avro.mapreduce.AvroJob;

// Single key output format
job.setOutputFormatClass(AvroKeyOutputFormat.class);
AvroJob.setOutputKeySchema(job, outputKeySchema);

// Key-value output format  
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
AvroJob.setOutputKeySchema(job, outputKeySchema);
AvroJob.setOutputValueSchema(job, outputValueSchema);

Sequence File Integration

Formats for reading and writing Avro data in Hadoop SequenceFile format.

public class AvroSequenceFileInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
    public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

public class AvroSequenceFileOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
    public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context) 
        throws IOException, InterruptedException;
}

// Legacy API equivalent
public class SequenceFileInputFormat<K,V> extends FileInputFormat<AvroWrapper<K>, AvroWrapper<V>> {
    public RecordReader<AvroWrapper<K>, AvroWrapper<V>> getRecordReader(
        InputSplit split, JobConf job, Reporter reporter) throws IOException;
}

Usage Example

import org.apache.avro.mapreduce.AvroSequenceFileInputFormat;
import org.apache.avro.mapreduce.AvroSequenceFileOutputFormat;

// Read from SequenceFile with Avro serialization
job.setInputFormatClass(AvroSequenceFileInputFormat.class);
AvroJob.setInputKeySchema(job, keySchema);
AvroJob.setInputValueSchema(job, valueSchema);

// Write to SequenceFile with Avro serialization
job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
AvroJob.setOutputKeySchema(job, keySchema);
AvroJob.setOutputValueSchema(job, valueSchema);

Record Readers and Writers

Base classes and implementations for reading and writing Avro records.

public abstract class AvroRecordReaderBase<K,V,T> extends RecordReader<K,V> {
    // Base class for Avro record readers
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
    public boolean nextKeyValue() throws IOException, InterruptedException;
    public float getProgress() throws IOException, InterruptedException;
    public void close() throws IOException;
    
    // Abstract methods for specific implementations
    public abstract K getCurrentKey() throws IOException, InterruptedException;
    public abstract V getCurrentValue() throws IOException, InterruptedException;
}

public class AvroKeyRecordReader<T> extends AvroRecordReaderBase<AvroKey<T>, NullWritable, T> {
    public AvroKey<T> getCurrentKey() throws IOException, InterruptedException;
    public NullWritable getCurrentValue() throws IOException, InterruptedException;
}

public class AvroKeyValueRecordReader<K,V> extends AvroRecordReaderBase<AvroKey<K>, AvroValue<V>, Pair<K,V>> {
    public AvroKey<K> getCurrentKey() throws IOException, InterruptedException;
    public AvroValue<V> getCurrentValue() throws IOException, InterruptedException;  
}

public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
    public void write(AvroKey<T> key, NullWritable value) throws IOException, InterruptedException;
    public void close(TaskAttemptContext context) throws IOException, InterruptedException;
}

public class AvroKeyValueRecordWriter<K,V> extends RecordWriter<AvroKey<K>, AvroValue<V>> {
    public void write(AvroKey<K> key, AvroValue<V> value) throws IOException, InterruptedException;
    public void close(TaskAttemptContext context) throws IOException, InterruptedException;
}

Multiple Output Support

Support for writing to multiple output files from a single job.

// Legacy API
public class AvroMultipleOutputs {
    public AvroMultipleOutputs(JobConf job);
    public <T> void write(String name, AvroWrapper<T> key, NullWritable value) throws IOException;
    public void close() throws IOException;
    
    public static void addNamedOutput(JobConf job, String name, Class<? extends OutputFormat> outputFormat, Schema schema);
    public static void setCountersEnabled(JobConf job, boolean enabled);
}

// New API  
public class org.apache.avro.mapreduce.AvroMultipleOutputs {
    public AvroMultipleOutputs(TaskAttemptContext context);
    public <K> void write(K key, NullWritable value, String baseOutputPath) throws IOException, InterruptedException;
    public <K,V> void write(K key, V value, String baseOutputPath) throws IOException, InterruptedException;
    public void close() throws IOException, InterruptedException;
    
    public static void addNamedOutput(Job job, String name, Class<? extends OutputFormat> outputFormat, 
                                     Class<?> keyClass, Class<?> valueClass);
    public static void setCountersEnabled(Job job, boolean enabled);
}

Usage Example

import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;

// Configure multiple outputs
AvroMultipleOutputs.addNamedOutput(job, "users", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);
AvroMultipleOutputs.addNamedOutput(job, "events", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);

// In reducer
public class MyReducer extends Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable> {
    private AvroMultipleOutputs multipleOutputs;
    
    protected void setup(Context context) {
        multipleOutputs = new AvroMultipleOutputs(context);
    }
    
    public void reduce(Text key, Iterable<IntWritable> values, Context context) {
        // Write to different outputs based on logic
        if (key.toString().startsWith("user_")) {
            multipleOutputs.write(new AvroKey<>(userRecord), NullWritable.get(), "users");
        } else {
            multipleOutputs.write(new AvroKey<>(eventRecord), NullWritable.get(), "events");
        }
    }
    
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

Configuration and Integration

File Extensions and Filtering

Control which files are processed by input formats:

// Ignore files without .avro extension
job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, true);

// Process all files regardless of extension
job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false);

Compression Support

All output formats support Avro's compression codecs:

import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapreduce.AvroJob;

// Legacy API
AvroJob.setOutputCodec(job, "snappy");
AvroJob.setOutputCodec(job, "deflate");
AvroJob.setOutputCodec(job, "bzip2");

// New API - via configuration
job.getConfiguration().set("avro.mapreduce.output.codec", "snappy");

Schema Configuration Integration

Input/output formats automatically use schemas configured via AvroJob:

// Schemas set via AvroJob are automatically picked up by formats
AvroJob.setInputSchema(job, inputSchema);     // Used by AvroInputFormat
AvroJob.setOutputSchema(job, outputSchema);   // Used by AvroOutputFormat

// New API with separate key/value schemas
AvroJob.setInputKeySchema(job, keySchema);    // Used by AvroKeyInputFormat
AvroJob.setInputValueSchema(job, valueSchema); // Used by AvroKeyValueInputFormat

Performance Considerations

Small Files Optimization

Use CombineAvroKeyValueFileInputFormat for many small files:

job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);

// Configure combine parameters
job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024);
job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", 64 * 1024 * 1024);

Memory Management

Input formats handle memory efficiently by:

  • Reusing reader objects
  • Supporting lazy deserialization
  • Proper resource cleanup

Error Handling

Common issues and solutions:

  • Schema Not Found: Ensure schema is configured via AvroJob before setting input/output format
  • File Format Errors: Verify input files are valid Avro container files
  • Codec Errors: Ensure output codec is supported and available on all nodes
  • Split Size Issues: For large files, tune split size parameters
  • Memory Issues: For large records, increase task memory limits

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-mapred

docs

cross-language-processing.md

data-wrappers.md

file-utilities.md

index.md

input-output-formats.md

job-configuration.md

mapreduce-processing.md

serialization.md

tile.json