0
# Configuration and Options
1
2
## Capabilities
3
4
### Avro Format Configuration Options
5
6
The library provides several configuration options to control Avro format behavior when used in Flink SQL tables.
7
8
```java { .api }
9
/**
10
* Configuration options for Avro format processing
11
*/
12
public class AvroFormatOptions {
13
14
/**
15
* The compression codec for Avro files
16
* Default: "snappy"
17
*/
18
public static final ConfigOption<String> AVRO_OUTPUT_CODEC;
19
20
/**
21
* The encoding to use for serialization and deserialization
22
* Default: AvroEncoding.BINARY
23
*/
24
public static final ConfigOption<AvroEncoding> AVRO_ENCODING;
25
26
/**
27
* Use legacy timestamp mapping for compatibility
28
* Default: true
29
*/
30
public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING;
31
}
32
```
33
34
### Encoding Types
35
36
```java { .api }
37
/**
38
* Serialization types for Avro encoding
39
*/
40
public enum AvroEncoding {
41
/**
42
* Use binary encoding for serialization and deserialization
43
* More compact and space-efficient representation
44
*/
45
BINARY,
46
47
/**
48
* Use JSON encoding for serialization and deserialization
49
* More human-readable option
50
*/
51
JSON
52
}
53
```
54
55
## Usage Examples
56
57
### SQL Table Configuration
58
59
```java
60
// Basic Avro format table
61
String sql = """
62
CREATE TABLE my_table (
63
id INT,
64
name STRING,
65
created_at TIMESTAMP(3)
66
) WITH (
67
'connector' = 'kafka',
68
'topic' = 'events',
69
'format' = 'avro'
70
)
71
""";
72
73
// Avro format with custom encoding
74
String sqlWithOptions = """
75
CREATE TABLE my_table (
76
id INT,
77
data STRING
78
) WITH (
79
'connector' = 'kafka',
80
'topic' = 'events',
81
'format' = 'avro',
82
'avro.encoding' = 'json'
83
)
84
""";
85
86
// File-based table with compression
87
String fileSql = """
88
CREATE TABLE avro_files (
89
user_id BIGINT,
90
event_time TIMESTAMP(3),
91
event_data STRING
92
) WITH (
93
'connector' = 'filesystem',
94
'path' = '/path/to/avro/files',
95
'format' = 'avro',
96
'avro.codec' = 'gzip'
97
)
98
""";
99
```
100
101
### Programmatic Configuration
102
103
```java
104
// Using configuration in format creation
105
ReadableConfig formatOptions = Configuration.fromMap(Map.of(
106
"avro.encoding", "binary",
107
"avro.timestamp_mapping.legacy", "false"
108
));
109
110
// Configuration values can be accessed programmatically
111
AvroEncoding encoding = formatOptions.get(AvroFormatOptions.AVRO_ENCODING);
112
boolean legacyMapping = formatOptions.get(AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING);
113
String codec = formatOptions.get(AvroFormatOptions.AVRO_OUTPUT_CODEC);
114
```
115
116
## Configuration Details
117
118
### Compression Codecs
119
120
Supported compression codecs for Avro files:
121
- `"snappy"` (default) - Fast compression/decompression
122
- `"gzip"` - Better compression ratio
123
- `"deflate"` - Standard deflate compression
124
- `"bzip2"` - High compression ratio
125
- `"xz"` - Very high compression ratio
126
- `"zstandard"` - Modern compression algorithm
127
- `null` - No compression
128
129
### Timestamp Mapping
130
131
The `AVRO_TIMESTAMP_LEGACY_MAPPING` option controls how Flink SQL timestamp types map to Avro timestamp types:
132
133
**Legacy Mapping (default: true)**:
134
- Flink SQL `TIMESTAMP` → Avro `TIMESTAMP`
135
- Flink SQL `TIMESTAMP_LTZ` → Avro `TIMESTAMP`
136
137
**Correct Mapping (when set to false)**:
138
- Flink SQL `TIMESTAMP` → Avro `LOCAL_TIMESTAMP`
139
- Flink SQL `TIMESTAMP_LTZ` → Avro `TIMESTAMP`
140
141
### Encoding Performance
142
143
**Binary Encoding**:
144
- More compact serialized format
145
- Faster serialization/deserialization
146
- Not human-readable
147
- Recommended for production use
148
149
**JSON Encoding**:
150
- Human-readable format
151
- Larger serialized size
152
- Slower processing
153
- Useful for debugging and development