Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
—
Advanced file handling utilities for sorted key-value files, sequence files, and compression codec integration. These utilities provide efficient storage and retrieval mechanisms for Avro data with specialized support for indexed access, sorted operations, and seamless integration with Hadoop's file system.
Indexed Avro container files that support efficient key-based lookups, similar to Hadoop's MapFile but designed specifically for Avro data.
public class SortedKeyValueFile {
// Nested reader class
public static class Reader<K,V> implements Closeable {
// Constructor
public Reader(Options options) throws IOException;
// Data access
public V get(K key) throws IOException;
public Iterator<AvroKeyValue<K,V>> iterator() throws IOException;
// Resource management
public void close() throws IOException;
// Options configuration
public static class Options {
// Builder pattern methods for configuration
public Options withKeySchema(Schema keySchema);
public Options withValueSchema(Schema valueSchema);
public Options withPath(Path path);
public Options withConfiguration(Configuration conf);
public Options withDataModel(GenericData dataModel);
}
}
// Nested writer class
public static class Writer<K,V> implements Closeable {
// Constructor
public Writer(Options options) throws IOException;
// Data writing (keys must be in sorted order)
public void append(K key, V value) throws IOException;
// Resource management
public void close() throws IOException;
// Options configuration
public static class Options {
// Builder pattern methods for configuration
public Options withKeySchema(Schema keySchema);
public Options withValueSchema(Schema valueSchema);
public Options withPath(Path path);
public Options withConfiguration(Configuration conf);
public Options withDataModel(GenericData dataModel);
public Options withCodec(CodecFactory codec);
}
}
}import org.apache.avro.hadoop.file.SortedKeyValueFile;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
// Define schemas
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");
// Write sorted key-value file
SortedKeyValueFile.Writer.Options writerOpts = new SortedKeyValueFile.Writer.Options()
.withKeySchema(keySchema)
.withValueSchema(valueSchema)
.withPath(new Path("/data/users.skv"))
.withConfiguration(conf)
.withCodec(CodecFactory.snappyCodec());
try (SortedKeyValueFile.Writer<String, GenericRecord> writer =
new SortedKeyValueFile.Writer<>(writerOpts)) {
// Append data in sorted key order
writer.append("alice", aliceRecord);
writer.append("bob", bobRecord);
writer.append("charlie", charlieRecord);
}
// Read from sorted key-value file
SortedKeyValueFile.Reader.Options readerOpts = new SortedKeyValueFile.Reader.Options()
.withKeySchema(keySchema)
.withValueSchema(valueSchema)
.withPath(new Path("/data/users.skv"))
.withConfiguration(conf);
try (SortedKeyValueFile.Reader<String, GenericRecord> reader =
new SortedKeyValueFile.Reader<>(readerOpts)) {
// Efficient key lookup
GenericRecord user = reader.get("bob");
// Iterator over all records
Iterator<AvroKeyValue<String, GenericRecord>> iter = reader.iterator();
while (iter.hasNext()) {
AvroKeyValue<String, GenericRecord> kv = iter.next();
String key = kv.getKey();
GenericRecord value = kv.getValue();
// Process key-value pair
}
}Enhanced Hadoop SequenceFile support with Avro serialization, providing metadata storage and schema information.
public class AvroSequenceFile {
// Metadata field constants
public static final Text METADATA_FIELD_KEY_SCHEMA = new Text("key.schema");
public static final Text METADATA_FIELD_VALUE_SCHEMA = new Text("value.schema");
// Writer creation
public static SequenceFile.Writer createWriter(Writer.Options options) throws IOException;
// Nested writer class
public static class Writer implements Closeable {
// Data writing
public void append(AvroWrapper key, AvroWrapper value) throws IOException;
public void close() throws IOException;
// Options configuration
public static class Options {
public Options withPath(Path path);
public Options withKeySchema(Schema keySchema);
public Options withValueSchema(Schema valueSchema);
public Options withConfiguration(Configuration conf);
public Options withCompressionType(SequenceFile.CompressionType compressionType);
public Options withCompressionCodec(CompressionCodec codec);
public Options withDataModel(GenericData dataModel);
}
}
// Nested reader class
public static class Reader implements Closeable {
// Data reading
public boolean next(AvroWrapper key, AvroWrapper value) throws IOException;
public void close() throws IOException;
// Schema access
public Schema getKeySchema();
public Schema getValueSchema();
// Options configuration
public static class Options {
public Options withPath(Path path);
public Options withConfiguration(Configuration conf);
public Options withDataModel(GenericData dataModel);
}
}
}import org.apache.avro.hadoop.io.AvroSequenceFile;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.io.SequenceFile;
// Write Avro sequence file
AvroSequenceFile.Writer.Options writerOpts = new AvroSequenceFile.Writer.Options()
.withPath(new Path("/data/sequence.seq"))
.withKeySchema(keySchema)
.withValueSchema(valueSchema)
.withConfiguration(conf)
.withCompressionType(SequenceFile.CompressionType.BLOCK);
try (AvroSequenceFile.Writer writer = new AvroSequenceFile.Writer(writerOpts)) {
AvroWrapper<String> key = new AvroWrapper<>();
AvroWrapper<GenericRecord> value = new AvroWrapper<>();
// Write key-value pairs
key.datum("key1");
value.datum(record1);
writer.append(key, value);
key.datum("key2");
value.datum(record2);
writer.append(key, value);
}
// Read Avro sequence file
AvroSequenceFile.Reader.Options readerOpts = new AvroSequenceFile.Reader.Options()
.withPath(new Path("/data/sequence.seq"))
.withConfiguration(conf);
try (AvroSequenceFile.Reader reader = new AvroSequenceFile.Reader(readerOpts)) {
Schema keySchema = reader.getKeySchema();
Schema valueSchema = reader.getValueSchema();
AvroWrapper<String> key = new AvroWrapper<>();
AvroWrapper<GenericRecord> value = new AvroWrapper<>();
while (reader.next(key, value)) {
String keyData = key.datum();
GenericRecord valueData = value.datum();
// Process data
}
}Utilities for mapping between Hadoop compression codecs and Avro compression codecs.
public class HadoopCodecFactory {
// Codec mapping methods
public static CodecFactory fromHadoopString(String hadoopCodecClass) throws AvroRuntimeException;
public static String getAvroCodecName(String hadoopCodecClass);
}Supported codec mappings:
org.apache.hadoop.io.compress.DefaultCodec → deflateorg.apache.hadoop.io.compress.GzipCodec → deflateorg.apache.hadoop.io.compress.BZip2Codec → bzip2org.apache.hadoop.io.compress.SnappyCodec → snappyorg.apache.hadoop.io.compress.Lz4Codec → xzcom.github.luben.zstd.ZstdCodec → zstandardimport org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.avro.file.CodecFactory;
// Convert Hadoop codec class to Avro codec
String hadoopCodecClass = "org.apache.hadoop.io.compress.SnappyCodec";
CodecFactory avroCodec = HadoopCodecFactory.fromHadoopString(hadoopCodecClass);
// Get Avro codec name
String avroCodecName = HadoopCodecFactory.getAvroCodecName(hadoopCodecClass);
// Returns "snappy"
// Use in file writing
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
.withCodec(avroCodec);Helper classes for working with key-value data structures in Avro format.
public class AvroKeyValue<K,V> {
// Constructor
public AvroKeyValue(GenericRecord keyValueRecord);
// Data access
public GenericRecord get();
public K getKey();
public V getValue();
public void setKey(K key);
public void setValue(V value);
// Schema utilities
public static Schema getSchema(Schema keySchema, Schema valueSchema);
// Field name constants
public static final String KEY_VALUE_PAIR_RECORD_NAME = "org.apache.avro.mapred.Pair";
public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = null;
public static final String KEY_FIELD = "key";
public static final String VALUE_FIELD = "value";
// Iterator support
public static class Iterator<K,V> implements java.util.Iterator<AvroKeyValue<K,V>> {
public Iterator(java.util.Iterator<GenericRecord> records);
public boolean hasNext();
public AvroKeyValue<K,V> next();
public void remove();
}
}import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
// Create key-value schema and record
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.create(Schema.Type.INT);
Schema kvSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
GenericRecord kvRecord = new GenericRecordBuilder(kvSchema)
.set(AvroKeyValue.KEY_FIELD, "count")
.set(AvroKeyValue.VALUE_FIELD, 42)
.build();
// Use helper
AvroKeyValue<String, Integer> kv = new AvroKeyValue<>(kvRecord);
String key = kv.getKey(); // "count"
Integer value = kv.getValue(); // 42
// Modify values
kv.setKey("total");
kv.setValue(100);File utilities integrate with MapReduce input formats:
// Use sorted key-value files as MapReduce input
public class SortedKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new SortedKeyValueRecordReader<>();
}
}// Write MapReduce output as sorted key-value files
public class SortedKeyValueOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context) {
return new SortedKeyValueRecordWriter<>();
}
}// Configure appropriate options for performance
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
.withKeySchema(keySchema)
.withValueSchema(valueSchema)
.withPath(path)
.withCodec(CodecFactory.snappyCodec()) // Use compression
.withConfiguration(conf);
// Ensure keys are pre-sorted for optimal performance
// SortedKeyValueFile requires keys to be in sorted order// Use try-with-resources for proper resource management
try (SortedKeyValueFile.Reader<String, GenericRecord> reader =
new SortedKeyValueFile.Reader<>(readerOpts)) {
// Use reader
} // Automatically closed
// Reuse objects in loops
AvroKeyValue<String, GenericRecord> reusableKV = null;
Iterator<AvroKeyValue<String, GenericRecord>> iter = reader.iterator();
while (iter.hasNext()) {
reusableKV = iter.next(); // May reuse object
// Process reusableKV
}// Choose appropriate compression based on use case
CodecFactory snappy = CodecFactory.snappyCodec(); // Fast compression/decompression
CodecFactory deflate = CodecFactory.deflateCodec(6); // Better compression ratio
CodecFactory bzip2 = CodecFactory.bzip2Codec(); // Highest compression ratio
// Configure based on workload
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
.withCodec(snappy); // Good balance of speed and compressionCommon issues and solutions:
// Always use try-with-resources or explicit cleanup
try (SortedKeyValueFile.Writer<String, GenericRecord> writer =
new SortedKeyValueFile.Writer<>(opts)) {
// Use writer
} catch (IOException e) {
// Handle errors
log.error("Failed to write sorted key-value file", e);
throw e;
}try {
CodecFactory codec = HadoopCodecFactory.fromHadoopString("unknown.codec");
} catch (AvroRuntimeException e) {
// Handle unsupported codec
log.warn("Unsupported codec, falling back to default", e);
CodecFactory codec = CodecFactory.deflateCodec();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-mapred