Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
—
Comprehensive utilities for configuring MapReduce jobs with Avro schemas, data models, and processing classes. This functionality is split between two parallel APIs: the legacy org.apache.avro.mapred.AvroJob for older Hadoop MapReduce and org.apache.avro.mapreduce.AvroJob for the modern API.
Configuration utilities for the legacy org.apache.hadoop.mapred API, providing schema management and class configuration for Avro-based MapReduce jobs.
public class AvroJob {
// Schema Configuration
public static void setInputSchema(JobConf job, Schema schema);
public static Schema getInputSchema(Configuration job);
public static void setMapOutputSchema(JobConf job, Schema schema);
public static Schema getMapOutputSchema(Configuration job);
public static void setOutputSchema(JobConf job, Schema schema);
public static Schema getOutputSchema(Configuration job);
// Processing Class Configuration
public static void setMapperClass(JobConf job, Class<? extends AvroMapper> c);
public static void setCombinerClass(JobConf job, Class<? extends AvroReducer> c);
public static void setReducerClass(JobConf job, Class<? extends AvroReducer> c);
// Data Model Configuration
public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass);
public static Class<? extends GenericData> getDataModelClass(Configuration conf);
public static GenericData createDataModel(Configuration conf);
public static GenericData createInputDataModel(Configuration conf);
public static GenericData createMapOutputDataModel(Configuration conf);
// Input/Output Format Configuration
public static void setInputSequenceFile(JobConf job);
public static void setReflect(JobConf job);
public static void setInputReflect(JobConf job);
public static void setMapOutputReflect(JobConf job);
// Output Configuration
public static void setOutputCodec(JobConf job, String codec);
public static void setOutputMeta(JobConf job, String key, String value);
public static void setOutputMeta(JobConf job, String key, long value);
public static void setOutputMeta(JobConf job, String key, byte[] value);
}import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
import org.apache.hadoop.mapred.JobConf;
// Parse schemas
Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}");
Schema outputSchema = Schema.parse("{\"type\":\"record\",\"name\":\"UserCount\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"int\"}]}");
// Configure job
JobConf job = new JobConf();
AvroJob.setInputSchema(job, userSchema);
AvroJob.setMapOutputSchema(job, userSchema);
AvroJob.setOutputSchema(job, outputSchema);
AvroJob.setMapperClass(job, UserMapper.class);
AvroJob.setReducerClass(job, UserReducer.class);
// Set compression and metadata
AvroJob.setOutputCodec(job, "snappy");
AvroJob.setOutputMeta(job, "created.by", "MyApplication");Configuration utilities for the modern org.apache.hadoop.mapreduce API, supporting separate key and value schemas for more flexible data processing patterns.
public class org.apache.avro.mapreduce.AvroJob {
// Input Schema Configuration
public static void setInputKeySchema(Job job, Schema schema);
public static Schema getInputKeySchema(Configuration conf);
public static void setInputValueSchema(Job job, Schema schema);
public static Schema getInputValueSchema(Configuration conf);
// Map Output Schema Configuration
public static void setMapOutputKeySchema(Job job, Schema schema);
public static Schema getMapOutputKeySchema(Configuration conf);
public static void setMapOutputValueSchema(Job job, Schema schema);
public static Schema getMapOutputValueSchema(Configuration conf);
// Output Schema Configuration
public static void setOutputKeySchema(Job job, Schema schema);
public static Schema getOutputKeySchema(Configuration conf);
public static void setOutputValueSchema(Job job, Schema schema);
public static Schema getOutputValueSchema(Configuration conf);
// Data Model Configuration
public static void setDataModelClass(Job job, Class<? extends GenericData> modelClass);
}import org.apache.avro.Schema;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.mapreduce.Job;
// Parse schemas
Schema keySchema = Schema.parse("{\"type\":\"string\"}");
Schema valueSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}");
Schema outputValueSchema = Schema.parse("{\"type\":\"int\"}");
// Configure job with separate key and value schemas
Job job = Job.getInstance();
AvroJob.setInputKeySchema(job, keySchema);
AvroJob.setInputValueSchema(job, valueSchema);
AvroJob.setMapOutputKeySchema(job, keySchema);
AvroJob.setMapOutputValueSchema(job, Schema.create(Schema.Type.INT));
AvroJob.setOutputKeySchema(job, keySchema);
AvroJob.setOutputValueSchema(job, outputValueSchema);Key configuration constants used internally by the job configuration utilities.
// Legacy API Constants (org.apache.avro.mapred.AvroJob)
public static final String INPUT_SCHEMA = "avro.input.schema";
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
public static final String OUTPUT_SCHEMA = "avro.output.schema";
public static final String OUTPUT_CODEC = "avro.output.codec";
public static final String REFLECT_DATA = "avro.reflect.data";
public static final String INPUT_IS_REFLECT = "avro.input.is.reflect";
public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect";
// New API Constants (org.apache.avro.mapreduce.AvroJob)
public static final String CONF_OUTPUT_CODEC = "avro.mapreduce.output.codec";Configuration methods for integrating different Avro data models (Generic, Specific, Reflect) with MapReduce jobs.
// Set data model class for custom data handling
public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass);
public static void setDataModelClass(Job job, Class<? extends GenericData> modelClass);
// Create data model instance from configuration
public static GenericData createDataModel(Configuration conf);
// Enable reflection-based data handling
public static void setReflect(JobConf job); // Enable reflection globally
public static void setInputReflect(JobConf job); // Enable for input only
public static void setMapOutputReflect(JobConf job); // Enable for map output onlyimport org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
// Use reflection data model for POJOs
AvroJob.setDataModelClass(job, ReflectData.class);
AvroJob.setReflect(job);
// Or configure reflection for specific stages
AvroJob.setInputReflect(job); // Input uses reflection
AvroJob.setMapOutputReflect(job); // Map output uses reflectionSpecialized configuration for output formatting, compression, and metadata.
// Compression configuration
public static void setOutputCodec(JobConf job, String codec);
// Metadata configuration (multiple overloads)
public static void setOutputMeta(JobConf job, String key, String value);
public static void setOutputMeta(JobConf job, String key, long value);
public static void setOutputMeta(JobConf job, String key, byte[] value);
// Special input format configuration
public static void setInputSequenceFile(JobConf job); // Enable SequenceFile input// Set compression codec
AvroJob.setOutputCodec(job, "snappy"); // Use Snappy compression
AvroJob.setOutputCodec(job, "deflate"); // Use Deflate compression
// Add metadata to output files
AvroJob.setOutputMeta(job, "created.by", "MyApplication");
AvroJob.setOutputMeta(job, "created.time", System.currentTimeMillis());
AvroJob.setOutputMeta(job, "version", "1.0".getBytes());
// Configure for SequenceFile input
AvroJob.setInputSequenceFile(job);The job configuration utilities work seamlessly with Avro input and output formats:
import org.apache.avro.mapred.*;
import org.apache.avro.mapreduce.*;
// Legacy API integration
job.setInputFormat(AvroInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);
// New API integration
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
// Or key-value formats
job.setInputFormatClass(AvroKeyValueInputFormat.class);
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);Common configuration errors and troubleshooting:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-mapred