0
# Table API Integration
1
2
Row-based serialization and deserialization for seamless integration with Flink's Table API and SQL layer. Converts between Flink's internal `RowData` representation and Avro format, enabling SQL operations on Avro data.
3
4
## AvroRowDataSerializationSchema
5
6
Serializes Flink `RowData` objects to Avro binary or JSON format for Table API integration.
7
8
```java { .api }
9
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
10
// Constructors
11
public AvroRowDataSerializationSchema(RowType rowType);
12
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
13
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding, boolean legacyTimestampMapping);
14
public AvroRowDataSerializationSchema(RowType rowType, SerializationSchema<GenericRecord> nestedSchema, RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
15
16
// Instance methods
17
public byte[] serialize(RowData row);
18
public void open(InitializationContext context) throws Exception;
19
}
20
```
21
22
### Usage Examples
23
24
**Basic Table Integration:**
25
26
```java
27
import org.apache.flink.table.types.logical.RowType;
28
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
29
30
// Define table schema
31
RowType rowType = RowType.of(
32
new LogicalType[] {
33
DataTypes.STRING().getLogicalType(),
34
DataTypes.INT().getLogicalType(),
35
DataTypes.BOOLEAN().getLogicalType()
36
},
37
new String[] {"name", "age", "active"}
38
);
39
40
// Create serializer
41
AvroRowDataSerializationSchema serializer = new AvroRowDataSerializationSchema(rowType);
42
43
// Use in table sink
44
TableSink<?> sink = new CustomTableSink(serializer);
45
```
46
47
**With Custom Encoding:**
48
49
```java
50
// Use JSON encoding for debugging
51
AvroRowDataSerializationSchema jsonSerializer = new AvroRowDataSerializationSchema(
52
rowType,
53
AvroEncoding.JSON
54
);
55
56
// Control timestamp mapping behavior
57
AvroRowDataSerializationSchema legacySerializer = new AvroRowDataSerializationSchema(
58
rowType,
59
AvroEncoding.BINARY,
60
true // Use legacy timestamp mapping for compatibility
61
);
62
```
63
64
## AvroRowDataDeserializationSchema
65
66
Deserializes Avro data to Flink `RowData` objects for Table API operations.
67
68
```java { .api }
69
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
70
// Constructors
71
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo);
72
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding);
73
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding, boolean legacyTimestampMapping);
74
75
// Instance methods
76
public RowData deserialize(byte[] message) throws IOException;
77
public TypeInformation<RowData> getProducedType();
78
public boolean isEndOfStream(RowData nextElement);
79
}
80
```
81
82
### Usage Examples
83
84
**Table Source Integration:**
85
86
```java
87
import org.apache.flink.api.common.typeinfo.TypeInformation;
88
import org.apache.flink.table.data.RowData;
89
90
// Create type information for RowData
91
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
92
93
// Create deserializer
94
AvroRowDataDeserializationSchema deserializer = new AvroRowDataDeserializationSchema(
95
rowType,
96
typeInfo
97
);
98
99
// Use in table source
100
TableSource<?> source = new CustomTableSource(deserializer);
101
```
102
103
**Kafka Table Integration:**
104
105
```java
106
// Create deserializer for Kafka table source
107
AvroRowDataDeserializationSchema kafkaDeserializer = new AvroRowDataDeserializationSchema(
108
rowType,
109
typeInfo,
110
AvroEncoding.BINARY
111
);
112
113
// Configure Kafka table
114
Map<String, String> properties = new HashMap<>();
115
properties.put("connector", "kafka");
116
properties.put("topic", "avro-topic");
117
properties.put("format", "avro");
118
```
119
120
## Format Factory Integration
121
122
The library provides automatic format discovery for Table API through the format factory.
123
124
```java { .api }
125
public class AvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
126
public static final String IDENTIFIER = "avro";
127
128
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
129
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
130
public String factoryIdentifier();
131
public Set<ConfigOption<?>> requiredOptions();
132
public Set<ConfigOption<?>> optionalOptions();
133
}
134
```
135
136
### SQL DDL Usage
137
138
**Create Table with Avro Format:**
139
140
```sql
141
CREATE TABLE avro_table (
142
name STRING,
143
age INT,
144
active BOOLEAN
145
) WITH (
146
'connector' = 'kafka',
147
'topic' = 'avro-topic',
148
'format' = 'avro',
149
'avro.encoding' = 'binary'
150
);
151
```
152
153
**Configuration Options:**
154
155
```sql
156
CREATE TABLE avro_table_json (
157
name STRING,
158
age INT,
159
created_at TIMESTAMP(3)
160
) WITH (
161
'connector' = 'kafka',
162
'topic' = 'avro-json-topic',
163
'format' = 'avro',
164
'avro.encoding' = 'json',
165
'avro.timestamp_mapping.legacy' = 'false'
166
);
167
```
168
169
## Schema Conversion
170
171
The library automatically converts between Flink and Avro type systems:
172
173
```java { .api }
174
public class AvroSchemaConverter {
175
public static Schema convertToSchema(RowType rowType);
176
public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
177
public static LogicalType convertToLogicalType(Schema schema);
178
}
179
```
180
181
### Type Mapping
182
183
**Flink to Avro Types:**
184
185
| Flink Type | Avro Type | Notes |
186
|------------|-----------|-------|
187
| BOOLEAN | boolean | Direct mapping |
188
| INT | int | 32-bit signed integer |
189
| BIGINT | long | 64-bit signed integer |
190
| FLOAT | float | 32-bit floating point |
191
| DOUBLE | double | 64-bit floating point |
192
| STRING | string | UTF-8 string |
193
| BINARY/VARBINARY | bytes | Byte array |
194
| TIMESTAMP | timestamp-millis | Logical type (legacy mapping) |
195
| TIMESTAMP_LTZ | timestamp-millis | Logical type (correct mapping) |
196
| DATE | date | Logical type |
197
| TIME | time-millis | Logical type |
198
| DECIMAL | decimal | Logical type with precision/scale |
199
| ARRAY | array | Repeated field |
200
| ROW | record | Nested record |
201
| MAP | map | Key-value mapping |
202
203
## Timestamp Mapping Options
204
205
The library supports two timestamp mapping modes:
206
207
### Legacy Mapping (Default)
208
- Both `TIMESTAMP` and `TIMESTAMP_LTZ` map to Avro `timestamp-millis`
209
- Maintains backward compatibility with Flink versions < 1.19
210
- Enable with `avro.timestamp_mapping.legacy = true`
211
212
### Correct Mapping
213
- `TIMESTAMP` maps to Avro `local-timestamp-millis`
214
- `TIMESTAMP_LTZ` maps to Avro `timestamp-millis`
215
- More semantically correct but may break existing pipelines
216
- Enable with `avro.timestamp_mapping.legacy = false`
217
218
## Error Handling
219
220
**Schema Conversion Errors:**
221
- Throws `IllegalArgumentException` for unsupported type conversions
222
- Logs warnings for lossy type conversions
223
224
**Runtime Errors:**
225
- Serialization failures throw `RuntimeException`
226
- Deserialization failures throw `IOException`
227
- Schema mismatches result in detailed error messages
228
229
## Performance Optimization
230
231
- **Schema Caching**: Converted schemas are cached to avoid repeated conversion
232
- **Converter Reuse**: Row-to-Avro converters are created once and reused
233
- **Memory Efficiency**: Direct field access without object allocation where possible
234
- **Lazy Initialization**: Schema parsing occurs only when needed