Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
—
File input format for reading Avro files in batch processing scenarios.
/**
* Input format for reading Avro files
* @param <T> Type of records to read
*/
public class AvroInputFormat<T> extends FileInputFormat<T> {
/**
* Creates AvroInputFormat for SpecificRecord types
* @param filePath Path to Avro file(s)
* @param recordClazz SpecificRecord class to deserialize to
*/
public AvroInputFormat(Path filePath, Class<T> recordClazz);
/**
* Creates AvroInputFormat for GenericRecord types
* @param filePath Path to Avro file(s)
* @param schema Avro schema for GenericRecord deserialization
*/
public AvroInputFormat(Path filePath, Schema schema);
/**
* Reads the next record from the input
* @param reuse Object to reuse for the next record (may be null)
* @return Next record, or null if end of input
* @throws IOException If reading fails
*/
public T nextRecord(T reuse) throws IOException;
/**
* Checks if the input has been exhausted
* @return true if no more records available
* @throws IOException If check fails
*/
public boolean reachedEnd() throws IOException;
}File output format for writing Avro files in batch processing scenarios.
/**
* Output format for writing Avro files
* @param <T> Type of records to write
*/
public class AvroOutputFormat<T> extends FileOutputFormat<T> {
/**
* Creates AvroOutputFormat for SpecificRecord types
* @param outputFilePath Path where to write Avro file
* @param recordClazz SpecificRecord class to serialize from
*/
public AvroOutputFormat(Path outputFilePath, Class<T> recordClazz);
/**
* Creates AvroOutputFormat for GenericRecord types
* @param outputFilePath Path where to write Avro file
* @param schema Avro schema for GenericRecord serialization
*/
public AvroOutputFormat(Path outputFilePath, Schema schema);
/**
* Writes a record to the output
* @param record Record to write
* @throws IOException If writing fails
*/
public void writeRecord(T record) throws IOException;
/**
* Sets the compression codec for the output file
* @param codecName Name of compression codec (snappy, gzip, etc.)
*/
public void setCodec(String codecName);
}High-performance bulk writing for streaming scenarios.
/**
* Bulk writer for Avro files that wraps an Avro DataFileWriter
* @param <T> Type of records to write
*/
public class AvroBulkWriter<T> implements BulkWriter<T> {
/**
* Creates a new AvroBulkWriter wrapping the given Avro DataFileWriter
* @param dataFileWriter The underlying Avro DataFileWriter
*/
public AvroBulkWriter(DataFileWriter<T> dataFileWriter);
/**
* Adds an element to the writer
* @param element Element to write
* @throws IOException If writing fails
*/
public void addElement(T element) throws IOException;
/**
* Flushes pending writes
* @throws IOException If flush fails
*/
public void flush() throws IOException;
/**
* Finishes writing and closes the writer
* @throws IOException If finish fails
*/
public void finish() throws IOException;
}
/**
* Factory for creating Avro bulk writers
* @param <T> Type of records to write
*/
public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
/**
* Creates AvroWriterFactory with AvroBuilder
* @param avroBuilder Builder for creating DataFileWriter instances
*/
public AvroWriterFactory(AvroBuilder<T> avroBuilder);
/**
* Creates a bulk writer for the given output stream
* @param out Output stream to write to
* @return Bulk writer instance
* @throws IOException If creation fails
*/
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
/**
* Builder interface for creating Avro DataFileWriter instances
* This is a functional interface that extends Serializable
* @param <T> Type of records the writer will handle
*/
@FunctionalInterface
public interface AvroBuilder<T> extends Serializable {
/**
* Creates and configures an Avro writer to the given output stream
* @param outputStream Output stream to write to
* @return Configured DataFileWriter
* @throws IOException If creation fails
*/
DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
}// Reading SpecificRecord files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// For generated SpecificRecord classes
AvroInputFormat<User> inputFormat =
new AvroInputFormat<>(new Path("/path/to/user/files/*.avro"), User.class);
DataSet<User> users = env.createInput(inputFormat);
// Process the data
DataSet<String> usernames = users
.filter(user -> user.getAge() > 18)
.map(user -> user.getUsername().toString());
// Reading GenericRecord files
Schema schema = new Schema.Parser().parse(new File("user-schema.avsc"));
AvroInputFormat<GenericRecord> genericInputFormat =
new AvroInputFormat<>(new Path("/path/to/generic/files/*.avro"), schema);
DataSet<GenericRecord> records = env.createInput(genericInputFormat);
// Process generic records
DataSet<Long> userIds = records
.map(record -> (Long) record.get("user_id"));// Writing SpecificRecord files
DataSet<User> users = // ... your user dataset
AvroOutputFormat<User> outputFormat =
new AvroOutputFormat<>(new Path("/output/path/users.avro"), User.class);
// Set compression codec
outputFormat.setCodec("snappy");
users.output(outputFormat);
// Writing GenericRecord files with custom schema
Schema schema = SchemaBuilder.record("ProcessedUser")
.fields()
.name("id").type().longType().noDefault()
.name("processed_name").type().stringType().noDefault()
.name("score").type().doubleType().noDefault()
.endRecord();
DataSet<GenericRecord> processedUsers = users
.map(new MapFunction<User, GenericRecord>() {
@Override
public GenericRecord map(User user) throws Exception {
GenericRecord record = new GenericData.Record(schema);
record.put("id", user.getId());
record.put("processed_name", user.getUsername().toString().toUpperCase());
record.put("score", calculateScore(user));
return record;
}
});
AvroOutputFormat<GenericRecord> genericOutputFormat =
new AvroOutputFormat<>(new Path("/output/path/processed.avro"), schema);
processedUsers.output(genericOutputFormat);
env.execute("Process Avro Files");// Setup for streaming to Avro files
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create Avro builder for SpecificRecord
AvroBuilder<User> avroBuilder = new AvroBuilder<User>() {
@Override
public DataFileWriter<User> createWriter(OutputStream out) throws IOException {
DatumWriter<User> datumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
// Set codec
dataFileWriter.setCodec(CodecFactory.snappyCodec());
// Create with schema from SpecificRecord
dataFileWriter.create(User.getClassSchema(), out);
return dataFileWriter;
}
};
// Create writer factory
AvroWriterFactory<User> writerFactory = new AvroWriterFactory<>(avroBuilder);
// Stream to files
DataStream<User> userStream = // ... your user stream
userStream
.addSink(StreamingFileSink
.forBulkFormat(new Path("/streaming/output/path"), writerFactory)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build());
env.execute("Stream to Avro Files");// Custom builder for GenericRecord with custom configuration
AvroBuilder<GenericRecord> customBuilder = new AvroBuilder<GenericRecord>() {
private final Schema schema;
public CustomAvroBuilder(Schema schema) {
this.schema = schema;
}
@Override
public DataFileWriter<GenericRecord> createWriter(OutputStream out) throws IOException {
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
// Configure compression
dataFileWriter.setCodec(CodecFactory.deflateCodec(6)); // Compression level 6
// Set custom metadata
dataFileWriter.setMeta("created_by", "flink-sql-avro");
dataFileWriter.setMeta("created_at", System.currentTimeMillis());
dataFileWriter.create(schema, out);
return dataFileWriter;
}
};
// Use custom builder
AvroWriterFactory<GenericRecord> customWriterFactory =
new AvroWriterFactory<>(customBuilder);// Create file system table with Avro format
String createTableSQL = """
CREATE TABLE user_files (
user_id BIGINT,
username STRING,
email STRING,
registration_date DATE,
last_login TIMESTAMP(3)
) PARTITIONED BY (registration_date)
WITH (
'connector' = 'filesystem',
'path' = '/data/warehouse/users',
'format' = 'avro',
'avro.codec' = 'snappy',
'sink.partition-commit.policy.kind' = 'success-file'
)
""";
tableEnv.executeSql(createTableSQL);
// Insert data into partitioned Avro files
String insertSQL = """
INSERT INTO user_files
SELECT
user_id,
username,
email,
CAST(created_at AS DATE) as registration_date,
last_login
FROM user_stream
""";
tableEnv.executeSql(insertSQL);
// Read from partitioned Avro files
Table result = tableEnv.sqlQuery("""
SELECT
COUNT(*) as daily_registrations,
registration_date
FROM user_files
WHERE registration_date >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY registration_date
ORDER BY registration_date
""");// Optimized settings for large file processing
AvroInputFormat<GenericRecord> optimizedInput =
new AvroInputFormat<>(new Path("/large/files/*.avro"), schema);
// Configure parallel reading
env.getConfig().setParallelism(16);
// For output, use appropriate compression
AvroOutputFormat<GenericRecord> optimizedOutput =
new AvroOutputFormat<>(new Path("/output/compressed.avro"), schema);
// Balance between compression ratio and speed
optimizedOutput.setCodec("snappy"); // Fast compression
// optimizedOutput.setCodec("gzip"); // Better compression ratio
// optimizedOutput.setCodec("zstd"); // Good balance
// For streaming, configure checkpointing for fault tolerance
env.enableCheckpointing(60000); // Checkpoint every minute
// Bulk writing with appropriate rolling policy
StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(new Path("/streaming/output"), writerFactory)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) // Roll every 15 minutes
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) // Roll after 5 minutes of inactivity
.withMaxPartSize(1024 * 1024 * 128) // Roll at 128MB
.build())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
.build();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro