CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-avro

Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications

Pending
Overview
Eval results
Files

file-io-operations.mddocs/

File I/O Operations

Input and output formats for reading and writing Avro files in Flink batch processing scenarios. Provides efficient file-based processing with support for compression, splitting, and type safety.

AvroInputFormat

FileInputFormat implementation for reading Avro files in batch processing jobs.

public class AvroInputFormat<E> extends FileInputFormat<E> 
        implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
    
    // Constructor
    public AvroInputFormat(Path filePath, Class<E> type);
    
    // Configuration methods
    public void setReuseAvroValue(boolean reuseAvroValue);
    public void setUnsplittable(boolean unsplittable);
    
    // Type information
    public TypeInformation<E> getProducedType();
}

Usage Examples

Reading Specific Records:

import org.apache.flink.formats.avro.AvroInputFormat;
import org.apache.flink.core.fs.Path;

// Create input format for specific record type
AvroInputFormat<User> inputFormat = new AvroInputFormat<>(
    new Path("hdfs://path/to/user/files/*.avro"), 
    User.class
);

// Configure reuse behavior
inputFormat.setReuseAvroValue(true);  // Default: true for better performance

// Create dataset
DataSet<User> users = env.createInput(inputFormat);

Reading Generic Records:

import org.apache.avro.generic.GenericRecord;

// Create input format for generic records
AvroInputFormat<GenericRecord> genericInputFormat = new AvroInputFormat<>(
    new Path("hdfs://path/to/data/*.avro"),
    GenericRecord.class
);

// Use in batch job
DataSet<GenericRecord> records = env.createInput(genericInputFormat);

// Process generic records
DataSet<String> names = records.map(record -> record.get("name").toString());

File Splitting Control:

// Allow file splitting for parallel processing (default)
inputFormat.setUnsplittable(false);

// Force reading entire files (useful for small files)
inputFormat.setUnsplittable(true);

// Process with parallelism
DataSet<User> users = env.createInput(inputFormat)
    .setParallelism(4);

AvroOutputFormat

FileOutputFormat implementation for writing Avro files in batch processing jobs.

public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
    
    // Constructors
    public AvroOutputFormat(Path filePath, Class<E> type);
    public AvroOutputFormat(Class<E> type);
    
    // Configuration methods
    public void setSchema(Schema schema);
    public void setCodec(Codec codec);
    
    // Codec enum
    public enum Codec {
        NULL, SNAPPY, BZIP2, DEFLATE, XZ
    }
}

Usage Examples

Writing Specific Records:

import org.apache.flink.formats.avro.AvroOutputFormat;

// Create output format with path and type
AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(
    new Path("hdfs://output/path/users.avro"),
    User.class
);

// Configure compression
outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);

// Write dataset
DataSet<User> users = ...;
users.output(outputFormat);

Writing Generic Records:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

// Create output format for generic records
AvroOutputFormat<GenericRecord> genericOutputFormat = new AvroOutputFormat<>(
    new Path("hdfs://output/path/records.avro"),
    GenericRecord.class
);

// Set explicit schema for generic records
Schema schema = new Schema.Parser().parse(schemaString);
genericOutputFormat.setSchema(schema);

// Configure compression
genericOutputFormat.setCodec(AvroOutputFormat.Codec.DEFLATE);

// Write generic records
DataSet<GenericRecord> records = ...;
records.output(genericOutputFormat);

Dynamic Output Paths:

// Create output format without fixed path
AvroOutputFormat<User> dynamicOutputFormat = new AvroOutputFormat<>(User.class);

// Use with custom output logic
users.output(dynamicOutputFormat)
    .withCustomPartitioning(new UserPartitioner());

Compression Codecs

Support for various compression algorithms to reduce file size and improve I/O performance.

public enum Codec {
    NULL((byte) 0, CodecFactory.nullCodec()),           // No compression
    SNAPPY((byte) 1, CodecFactory.snappyCodec()),       // Fast compression
    BZIP2((byte) 2, CodecFactory.bzip2Codec()),         // High compression ratio
    DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), // Standard compression
    XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); // High compression ratio
}

Codec Selection Guidelines

SNAPPY (Recommended):

  • Fast compression and decompression
  • Good balance between speed and compression ratio
  • Default codec for most use cases

DEFLATE:

  • Standard compression algorithm
  • Better compression than SNAPPY, slower processing
  • Good for storage-constrained environments

BZIP2:

  • High compression ratio
  • Slower than SNAPPY and DEFLATE
  • Best for archival storage

XZ:

  • Highest compression ratio
  • Slowest processing
  • Best for long-term storage with infrequent access

NULL:

  • No compression
  • Fastest processing
  • Use when storage space is not a concern

Performance Optimization

Input Format Optimization

File Splitting:

// Enable splitting for large files (parallel processing)
inputFormat.setUnsplittable(false);

// Disable splitting for small files (reduce overhead)
inputFormat.setUnsplittable(true);

Object Reuse:

// Enable object reuse for better memory performance (default)
inputFormat.setReuseAvroValue(true);

// Disable if objects need to be retained across operations
inputFormat.setReuseAvroValue(false);

Output Format Optimization

Compression Selection:

// For high-throughput scenarios
outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);

// For storage optimization
outputFormat.setCodec(AvroOutputFormat.Codec.BZIP2);

// For archival
outputFormat.setCodec(AvroOutputFormat.Codec.XZ);

Error Handling

Input Errors:

  • File not found: Throws FileNotFoundException
  • Schema mismatch: Throws IOException with detailed error message
  • Corrupted files: Throws AvroRuntimeException

Output Errors:

  • Path creation failure: Throws IOException
  • Schema validation errors: Throws IllegalArgumentException
  • Disk space issues: Throws IOException

Error Recovery Patterns

// Robust input processing
try {
    DataSet<User> users = env.createInput(inputFormat);
    // Process data
} catch (Exception e) {
    logger.error("Failed to read Avro files", e);
    // Implement fallback or retry logic
}

// Safe output writing
try {
    users.output(outputFormat);
    env.execute("Write Avro Files");
} catch (Exception e) {
    logger.error("Failed to write Avro files", e);
    // Clean up partial files or retry
}

Integration with Hadoop Ecosystem

HDFS Integration:

// Read from HDFS
AvroInputFormat<User> hdfsInputFormat = new AvroInputFormat<>(
    new Path("hdfs://namenode:8020/data/users/*.avro"),
    User.class
);

// Write to HDFS with replication
AvroOutputFormat<User> hdfsOutputFormat = new AvroOutputFormat<>(
    new Path("hdfs://namenode:8020/output/users.avro"),
    User.class
);

S3 Integration:

// Read from S3
AvroInputFormat<User> s3InputFormat = new AvroInputFormat<>(
    new Path("s3a://bucket/data/users/*.avro"),
    User.class
);

// Write to S3
AvroOutputFormat<User> s3OutputFormat = new AvroOutputFormat<>(
    new Path("s3a://bucket/output/users.avro"),
    User.class
);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-avro

docs

bulk-writers.md

file-io-operations.md

index.md

schema-registry-integration.md

serialization-deserialization.md

table-api-integration.md

type-system-integration.md

tile.json