Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications
—
Input and output formats for reading and writing Avro files in Flink batch processing scenarios. Provides efficient file-based processing with support for compression, splitting, and type safety.
FileInputFormat implementation for reading Avro files in batch processing jobs.
public class AvroInputFormat<E> extends FileInputFormat<E>
implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
// Constructor
public AvroInputFormat(Path filePath, Class<E> type);
// Configuration methods
public void setReuseAvroValue(boolean reuseAvroValue);
public void setUnsplittable(boolean unsplittable);
// Type information
public TypeInformation<E> getProducedType();
}Reading Specific Records:
import org.apache.flink.formats.avro.AvroInputFormat;
import org.apache.flink.core.fs.Path;
// Create input format for specific record type
AvroInputFormat<User> inputFormat = new AvroInputFormat<>(
new Path("hdfs://path/to/user/files/*.avro"),
User.class
);
// Configure reuse behavior
inputFormat.setReuseAvroValue(true); // Default: true for better performance
// Create dataset
DataSet<User> users = env.createInput(inputFormat);Reading Generic Records:
import org.apache.avro.generic.GenericRecord;
// Create input format for generic records
AvroInputFormat<GenericRecord> genericInputFormat = new AvroInputFormat<>(
new Path("hdfs://path/to/data/*.avro"),
GenericRecord.class
);
// Use in batch job
DataSet<GenericRecord> records = env.createInput(genericInputFormat);
// Process generic records
DataSet<String> names = records.map(record -> record.get("name").toString());File Splitting Control:
// Allow file splitting for parallel processing (default)
inputFormat.setUnsplittable(false);
// Force reading entire files (useful for small files)
inputFormat.setUnsplittable(true);
// Process with parallelism
DataSet<User> users = env.createInput(inputFormat)
.setParallelism(4);FileOutputFormat implementation for writing Avro files in batch processing jobs.
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
// Constructors
public AvroOutputFormat(Path filePath, Class<E> type);
public AvroOutputFormat(Class<E> type);
// Configuration methods
public void setSchema(Schema schema);
public void setCodec(Codec codec);
// Codec enum
public enum Codec {
NULL, SNAPPY, BZIP2, DEFLATE, XZ
}
}Writing Specific Records:
import org.apache.flink.formats.avro.AvroOutputFormat;
// Create output format with path and type
AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(
new Path("hdfs://output/path/users.avro"),
User.class
);
// Configure compression
outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);
// Write dataset
DataSet<User> users = ...;
users.output(outputFormat);Writing Generic Records:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// Create output format for generic records
AvroOutputFormat<GenericRecord> genericOutputFormat = new AvroOutputFormat<>(
new Path("hdfs://output/path/records.avro"),
GenericRecord.class
);
// Set explicit schema for generic records
Schema schema = new Schema.Parser().parse(schemaString);
genericOutputFormat.setSchema(schema);
// Configure compression
genericOutputFormat.setCodec(AvroOutputFormat.Codec.DEFLATE);
// Write generic records
DataSet<GenericRecord> records = ...;
records.output(genericOutputFormat);Dynamic Output Paths:
// Create output format without fixed path
AvroOutputFormat<User> dynamicOutputFormat = new AvroOutputFormat<>(User.class);
// Use with custom output logic
users.output(dynamicOutputFormat)
.withCustomPartitioning(new UserPartitioner());Support for various compression algorithms to reduce file size and improve I/O performance.
public enum Codec {
NULL((byte) 0, CodecFactory.nullCodec()), // No compression
SNAPPY((byte) 1, CodecFactory.snappyCodec()), // Fast compression
BZIP2((byte) 2, CodecFactory.bzip2Codec()), // High compression ratio
DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), // Standard compression
XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); // High compression ratio
}SNAPPY (Recommended):
DEFLATE:
BZIP2:
XZ:
NULL:
File Splitting:
// Enable splitting for large files (parallel processing)
inputFormat.setUnsplittable(false);
// Disable splitting for small files (reduce overhead)
inputFormat.setUnsplittable(true);Object Reuse:
// Enable object reuse for better memory performance (default)
inputFormat.setReuseAvroValue(true);
// Disable if objects need to be retained across operations
inputFormat.setReuseAvroValue(false);Compression Selection:
// For high-throughput scenarios
outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);
// For storage optimization
outputFormat.setCodec(AvroOutputFormat.Codec.BZIP2);
// For archival
outputFormat.setCodec(AvroOutputFormat.Codec.XZ);Input Errors:
FileNotFoundExceptionIOException with detailed error messageAvroRuntimeExceptionOutput Errors:
IOExceptionIllegalArgumentExceptionIOException// Robust input processing
try {
DataSet<User> users = env.createInput(inputFormat);
// Process data
} catch (Exception e) {
logger.error("Failed to read Avro files", e);
// Implement fallback or retry logic
}
// Safe output writing
try {
users.output(outputFormat);
env.execute("Write Avro Files");
} catch (Exception e) {
logger.error("Failed to write Avro files", e);
// Clean up partial files or retry
}HDFS Integration:
// Read from HDFS
AvroInputFormat<User> hdfsInputFormat = new AvroInputFormat<>(
new Path("hdfs://namenode:8020/data/users/*.avro"),
User.class
);
// Write to HDFS with replication
AvroOutputFormat<User> hdfsOutputFormat = new AvroOutputFormat<>(
new Path("hdfs://namenode:8020/output/users.avro"),
User.class
);S3 Integration:
// Read from S3
AvroInputFormat<User> s3InputFormat = new AvroInputFormat<>(
new Path("s3a://bucket/data/users/*.avro"),
User.class
);
// Write to S3
AvroOutputFormat<User> s3OutputFormat = new AvroOutputFormat<>(
new Path("s3a://bucket/output/users.avro"),
User.class
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-avro