Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications
—
Factory and writer classes for efficient bulk writing of Avro files with support for various record types, compression options, and streaming file operations. Designed for high-throughput scenarios with automatic file rolling and management.
Utility class providing static factory methods for creating Avro writer factories for different record types.
public class AvroWriters {
// For specific records (generated from Avro schema)
public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type);
// For generic records with explicit schema
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema);
// For POJO records using reflection
public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type);
}Writing Specific Records:
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
// Create writer factory for specific record type
AvroWriterFactory<User> writerFactory = AvroWriters.forSpecificRecord(User.class);
// Create streaming file sink
StreamingFileSink<User> sink = StreamingFileSink
.forBulkFormat(new Path("output/path"), writerFactory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(128))
.build())
.build();
// Use in streaming job
DataStream<User> userStream = ...;
userStream.addSink(sink);Writing Generic Records:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// Define schema
Schema schema = new Schema.Parser().parse(schemaString);
// Create writer factory for generic records
AvroWriterFactory<GenericRecord> genericWriterFactory = AvroWriters.forGenericRecord(schema);
// Create file sink
StreamingFileSink<GenericRecord> genericSink = StreamingFileSink
.forBulkFormat(new Path("output/generic"), genericWriterFactory)
.build();
// Use with generic records
DataStream<GenericRecord> recordStream = ...;
recordStream.addSink(genericSink);Writing Reflection-based Records:
// For POJOs without generated Avro classes
AvroWriterFactory<Person> reflectWriterFactory = AvroWriters.forReflectRecord(Person.class);
// Create sink for POJO records
StreamingFileSink<Person> pojoSink = StreamingFileSink
.forBulkFormat(new Path("output/pojos"), reflectWriterFactory)
.build();Factory class that implements Flink's BulkWriter.Factory interface for creating Avro bulk writers.
public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
public AvroWriterFactory(AvroBuilder<T> builder);
// Factory interface methods
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}Custom Builder Usage:
import org.apache.flink.formats.avro.AvroBuilder;
// Create custom builder with specific configuration
AvroBuilder<User> customBuilder = (outputStream) -> {
Schema schema = User.getClassSchema();
DatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
// Configure compression
dataFileWriter.setCodec(CodecFactory.snappyCodec());
// Set metadata
dataFileWriter.setMeta("created_by", "flink-job");
dataFileWriter.setMeta("version", "1.0");
return dataFileWriter.create(schema, outputStream);
};
// Create factory with custom builder
AvroWriterFactory<User> customFactory = new AvroWriterFactory<>(customBuilder);Functional interface for creating DataFileWriter instances with custom configuration.
public interface AvroBuilder<T> extends Serializable {
DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
}// Lambda implementation for specific records
AvroBuilder<User> specificBuilder = (out) -> {
String schemaString = SpecificData.get().getSchema(User.class).toString();
Schema schema = new Schema.Parser().parse(schemaString);
SpecificDatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
return dataFileWriter.create(schema, out);
};
// Anonymous class implementation for generic records
AvroBuilder<GenericRecord> genericBuilder = new AvroBuilder<GenericRecord>() {
@Override
public DataFileWriter<GenericRecord> createWriter(OutputStream outputStream) throws IOException {
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
return dataFileWriter.create(schema, outputStream);
}
};The actual bulk writer implementation that handles writing records to Avro files.
public class AvroBulkWriter<T> implements BulkWriter<T> {
public void addElement(T element) throws IOException;
public void flush() throws IOException;
public void finish() throws IOException;
}addElement()Size-based Rolling:
StreamingFileSink<User> sink = StreamingFileSink
.forBulkFormat(outputPath, writerFactory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(256)) // Roll at 256MB
.build())
.build();Time-based Rolling:
StreamingFileSink<User> sink = StreamingFileSink
.forBulkFormat(outputPath, writerFactory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15)) // Roll every 15 minutes
.withInactivityInterval(Duration.ofMinutes(5)) // Roll after 5 minutes of inactivity
.build())
.build();Combined Rolling Policy:
RollingPolicy<User, String> policy = DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofHours(1)) // Max 1 hour per file
.withInactivityInterval(Duration.ofMinutes(15)) // Roll after 15 min inactivity
.withMaxPartSize(MemorySize.of(512, MemoryUnit.MEGA_BYTES)) // Max 512MB per file
.build();Default Bucketing:
// Files organized by processing time
StreamingFileSink<User> sink = StreamingFileSink
.forBulkFormat(outputPath, writerFactory)
.build();
// Results in: output/2023-12-01--18/part-0-0.avroCustom Bucketing:
// Custom bucket assignment based on record fields
BucketAssigner<User, String> bucketAssigner = new BucketAssigner<User, String>() {
@Override
public String getBucketId(User user, Context context) {
return "department=" + user.getDepartment() + "/year=" + user.getCreatedYear();
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
StreamingFileSink<User> partitionedSink = StreamingFileSink
.forBulkFormat(outputPath, writerFactory)
.withBucketAssigner(bucketAssigner)
.build();
// Results in: output/department=engineering/year=2023/part-0-0.avroBuffer Size Tuning:
// Configure larger buffers for better I/O performance
AvroBuilder<User> optimizedBuilder = (out) -> {
DataFileWriter<User> writer = createBasicWriter(out);
writer.setSyncInterval(16 * 1024); // 16KB sync intervals
return writer;
};Compression Selection:
// Choose compression based on use case
AvroBuilder<User> compressedBuilder = (out) -> {
DataFileWriter<User> writer = createBasicWriter(out);
writer.setCodec(CodecFactory.snappyCodec()); // Fast compression
// writer.setCodec(CodecFactory.deflateCodec(6)); // Better compression
return writer;
};Parallelism Tuning:
// Adjust parallelism based on throughput requirements
userStream.addSink(sink).setParallelism(4);Checkpointing Configuration:
// Configure checkpointing for exactly-once guarantees
env.enableCheckpointing(30000); // 30 second intervals
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);Writer Failure Handling:
// Writers automatically handle I/O failures through Flink's fault tolerance
// Failed writers are recreated on recovery
StreamingFileSink<User> resilientSink = StreamingFileSink
.forBulkFormat(outputPath, writerFactory)
.withRollingPolicy(policy)
.build();Schema Evolution Support:
// Handle schema evolution in writers
AvroBuilder<GenericRecord> evolvingBuilder = (out) -> {
// Use writer schema that's compatible with multiple reader schemas
Schema writerSchema = SchemaUtils.getLatestSchema();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(writerSchema);
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter);
return writer.create(writerSchema, out);
};File Output Monitoring:
// Monitor file creation and sizes
StreamingFileSink<User> monitoredSink = StreamingFileSink
.forBulkFormat(outputPath, writerFactory)
.withBucketCheckInterval(60000) // Check every minute
.build();Metrics Integration:
// Custom metrics for monitoring throughput
public class MetricsAvroBuilder<T> implements AvroBuilder<T> {
private final Counter recordsWritten;
@Override
public DataFileWriter<T> createWriter(OutputStream out) throws IOException {
// Wrap writer with metrics collection
return new MetricsDataFileWriter<>(createBaseWriter(out), recordsWritten);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-avro