Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
—
The library provides several configuration options to control Avro format behavior when used in Flink SQL tables.
/**
* Configuration options for Avro format processing
*/
public class AvroFormatOptions {
/**
* The compression codec for Avro files
* Default: "snappy"
*/
public static final ConfigOption<String> AVRO_OUTPUT_CODEC;
/**
* The encoding to use for serialization and deserialization
* Default: AvroEncoding.BINARY
*/
public static final ConfigOption<AvroEncoding> AVRO_ENCODING;
/**
* Use legacy timestamp mapping for compatibility
* Default: true
*/
public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING;
}/**
* Serialization types for Avro encoding
*/
public enum AvroEncoding {
/**
* Use binary encoding for serialization and deserialization
* More compact and space-efficient representation
*/
BINARY,
/**
* Use JSON encoding for serialization and deserialization
* More human-readable option
*/
JSON
}// Basic Avro format table
String sql = """
CREATE TABLE my_table (
id INT,
name STRING,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'format' = 'avro'
)
""";
// Avro format with custom encoding
String sqlWithOptions = """
CREATE TABLE my_table (
id INT,
data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'format' = 'avro',
'avro.encoding' = 'json'
)
""";
// File-based table with compression
String fileSql = """
CREATE TABLE avro_files (
user_id BIGINT,
event_time TIMESTAMP(3),
event_data STRING
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/avro/files',
'format' = 'avro',
'avro.codec' = 'gzip'
)
""";// Using configuration in format creation
ReadableConfig formatOptions = Configuration.fromMap(Map.of(
"avro.encoding", "binary",
"avro.timestamp_mapping.legacy", "false"
));
// Configuration values can be accessed programmatically
AvroEncoding encoding = formatOptions.get(AvroFormatOptions.AVRO_ENCODING);
boolean legacyMapping = formatOptions.get(AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING);
String codec = formatOptions.get(AvroFormatOptions.AVRO_OUTPUT_CODEC);Supported compression codecs for Avro files:
"snappy" (default) - Fast compression/decompression"gzip" - Better compression ratio"deflate" - Standard deflate compression"bzip2" - High compression ratio"xz" - Very high compression ratio"zstandard" - Modern compression algorithmnull - No compressionThe AVRO_TIMESTAMP_LEGACY_MAPPING option controls how Flink SQL timestamp types map to Avro timestamp types:
Legacy Mapping (default: true):
TIMESTAMP → Avro TIMESTAMPTIMESTAMP_LTZ → Avro TIMESTAMPCorrect Mapping (when set to false):
TIMESTAMP → Avro LOCAL_TIMESTAMPTIMESTAMP_LTZ → Avro TIMESTAMPBinary Encoding:
JSON Encoding:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro