CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-avro

Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications

Pending
Overview
Eval results
Files

bulk-writers.mddocs/

Bulk Writers

Factory and writer classes for efficient bulk writing of Avro files with support for various record types, compression options, and streaming file operations. Designed for high-throughput scenarios with automatic file rolling and management.

AvroWriters

Utility class providing static factory methods for creating Avro writer factories for different record types.

public class AvroWriters {
    // For specific records (generated from Avro schema)
    public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type);
    
    // For generic records with explicit schema
    public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema);
    
    // For POJO records using reflection
    public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type);
}

Usage Examples

Writing Specific Records:

import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

// Create writer factory for specific record type
AvroWriterFactory<User> writerFactory = AvroWriters.forSpecificRecord(User.class);

// Create streaming file sink
StreamingFileSink<User> sink = StreamingFileSink
    .forBulkFormat(new Path("output/path"), writerFactory)
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withRolloverInterval(Duration.ofMinutes(15))
        .withInactivityInterval(Duration.ofMinutes(5))
        .withMaxPartSize(MemorySize.ofMebiBytes(128))
        .build())
    .build();

// Use in streaming job
DataStream<User> userStream = ...;
userStream.addSink(sink);

Writing Generic Records:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

// Define schema
Schema schema = new Schema.Parser().parse(schemaString);

// Create writer factory for generic records
AvroWriterFactory<GenericRecord> genericWriterFactory = AvroWriters.forGenericRecord(schema);

// Create file sink
StreamingFileSink<GenericRecord> genericSink = StreamingFileSink
    .forBulkFormat(new Path("output/generic"), genericWriterFactory)
    .build();

// Use with generic records
DataStream<GenericRecord> recordStream = ...;
recordStream.addSink(genericSink);

Writing Reflection-based Records:

// For POJOs without generated Avro classes
AvroWriterFactory<Person> reflectWriterFactory = AvroWriters.forReflectRecord(Person.class);

// Create sink for POJO records
StreamingFileSink<Person> pojoSink = StreamingFileSink
    .forBulkFormat(new Path("output/pojos"), reflectWriterFactory)
    .build();

AvroWriterFactory

Factory class that implements Flink's BulkWriter.Factory interface for creating Avro bulk writers.

public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
    public AvroWriterFactory(AvroBuilder<T> builder);
    
    // Factory interface methods
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}

Advanced Configuration

Custom Builder Usage:

import org.apache.flink.formats.avro.AvroBuilder;

// Create custom builder with specific configuration
AvroBuilder<User> customBuilder = (outputStream) -> {
    Schema schema = User.getClassSchema();
    DatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);
    DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
    
    // Configure compression
    dataFileWriter.setCodec(CodecFactory.snappyCodec());
    
    // Set metadata
    dataFileWriter.setMeta("created_by", "flink-job");
    dataFileWriter.setMeta("version", "1.0");
    
    return dataFileWriter.create(schema, outputStream);
};

// Create factory with custom builder
AvroWriterFactory<User> customFactory = new AvroWriterFactory<>(customBuilder);

AvroBuilder

Functional interface for creating DataFileWriter instances with custom configuration.

public interface AvroBuilder<T> extends Serializable {
    DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
}

Custom Builder Implementation

// Lambda implementation for specific records
AvroBuilder<User> specificBuilder = (out) -> {
    String schemaString = SpecificData.get().getSchema(User.class).toString();
    Schema schema = new Schema.Parser().parse(schemaString);
    SpecificDatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);
    DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
    return dataFileWriter.create(schema, out);
};

// Anonymous class implementation for generic records
AvroBuilder<GenericRecord> genericBuilder = new AvroBuilder<GenericRecord>() {
    @Override
    public DataFileWriter<GenericRecord> createWriter(OutputStream outputStream) throws IOException {
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        return dataFileWriter.create(schema, outputStream);
    }
};

AvroBulkWriter

The actual bulk writer implementation that handles writing records to Avro files.

public class AvroBulkWriter<T> implements BulkWriter<T> {
    public void addElement(T element) throws IOException;
    public void flush() throws IOException;
    public void finish() throws IOException;
}

Writer Lifecycle

  1. Creation: Writer is created by factory when new file is started
  2. Writing: Elements are added via addElement()
  3. Flushing: Periodic flush calls ensure data is written to disk
  4. Finishing: Writer is finished when file rolling occurs

File Rolling and Management

Rolling Policies

Size-based Rolling:

StreamingFileSink<User> sink = StreamingFileSink
    .forBulkFormat(outputPath, writerFactory)
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withMaxPartSize(MemorySize.ofMebiBytes(256))  // Roll at 256MB
        .build())
    .build();

Time-based Rolling:

StreamingFileSink<User> sink = StreamingFileSink
    .forBulkFormat(outputPath, writerFactory)
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withRolloverInterval(Duration.ofMinutes(15))  // Roll every 15 minutes
        .withInactivityInterval(Duration.ofMinutes(5)) // Roll after 5 minutes of inactivity
        .build())
    .build();

Combined Rolling Policy:

RollingPolicy<User, String> policy = DefaultRollingPolicy.builder()
    .withRolloverInterval(Duration.ofHours(1))      // Max 1 hour per file
    .withInactivityInterval(Duration.ofMinutes(15)) // Roll after 15 min inactivity
    .withMaxPartSize(MemorySize.of(512, MemoryUnit.MEGA_BYTES)) // Max 512MB per file
    .build();

Bucket Assignment

Default Bucketing:

// Files organized by processing time
StreamingFileSink<User> sink = StreamingFileSink
    .forBulkFormat(outputPath, writerFactory)
    .build();
// Results in: output/2023-12-01--18/part-0-0.avro

Custom Bucketing:

// Custom bucket assignment based on record fields
BucketAssigner<User, String> bucketAssigner = new BucketAssigner<User, String>() {
    @Override
    public String getBucketId(User user, Context context) {
        return "department=" + user.getDepartment() + "/year=" + user.getCreatedYear();
    }
    
    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
};

StreamingFileSink<User> partitionedSink = StreamingFileSink
    .forBulkFormat(outputPath, writerFactory)
    .withBucketAssigner(bucketAssigner)
    .build();
// Results in: output/department=engineering/year=2023/part-0-0.avro

Performance Optimization

Writer Configuration

Buffer Size Tuning:

// Configure larger buffers for better I/O performance
AvroBuilder<User> optimizedBuilder = (out) -> {
    DataFileWriter<User> writer = createBasicWriter(out);
    writer.setSyncInterval(16 * 1024); // 16KB sync intervals
    return writer;
};

Compression Selection:

// Choose compression based on use case
AvroBuilder<User> compressedBuilder = (out) -> {
    DataFileWriter<User> writer = createBasicWriter(out);
    writer.setCodec(CodecFactory.snappyCodec()); // Fast compression
    // writer.setCodec(CodecFactory.deflateCodec(6)); // Better compression
    return writer;
};

Sink Configuration

Parallelism Tuning:

// Adjust parallelism based on throughput requirements
userStream.addSink(sink).setParallelism(4);

Checkpointing Configuration:

// Configure checkpointing for exactly-once guarantees
env.enableCheckpointing(30000); // 30 second intervals
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

Error Handling and Monitoring

Error Recovery

Writer Failure Handling:

// Writers automatically handle I/O failures through Flink's fault tolerance
// Failed writers are recreated on recovery
StreamingFileSink<User> resilientSink = StreamingFileSink
    .forBulkFormat(outputPath, writerFactory)
    .withRollingPolicy(policy)
    .build();

Schema Evolution Support:

// Handle schema evolution in writers
AvroBuilder<GenericRecord> evolvingBuilder = (out) -> {
    // Use writer schema that's compatible with multiple reader schemas
    Schema writerSchema = SchemaUtils.getLatestSchema();
    GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(writerSchema);
    DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter);
    return writer.create(writerSchema, out);
};

Monitoring

File Output Monitoring:

// Monitor file creation and sizes
StreamingFileSink<User> monitoredSink = StreamingFileSink
    .forBulkFormat(outputPath, writerFactory)
    .withBucketCheckInterval(60000) // Check every minute
    .build();

Metrics Integration:

// Custom metrics for monitoring throughput
public class MetricsAvroBuilder<T> implements AvroBuilder<T> {
    private final Counter recordsWritten;
    
    @Override
    public DataFileWriter<T> createWriter(OutputStream out) throws IOException {
        // Wrap writer with metrics collection
        return new MetricsDataFileWriter<>(createBaseWriter(out), recordsWritten);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-avro

docs

bulk-writers.md

file-io-operations.md

index.md

schema-registry-integration.md

serialization-deserialization.md

table-api-integration.md

type-system-integration.md

tile.json