or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writers.mdfile-io-operations.mdindex.mdschema-registry-integration.mdserialization-deserialization.mdtable-api-integration.mdtype-system-integration.md

table-api-integration.mddocs/

0

# Table API Integration

1

2

Row-based serialization and deserialization for seamless integration with Flink's Table API and SQL layer. Converts between Flink's internal `RowData` representation and Avro format, enabling SQL operations on Avro data.

3

4

## AvroRowDataSerializationSchema

5

6

Serializes Flink `RowData` objects to Avro binary or JSON format for Table API integration.

7

8

```java { .api }

9

public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {

10

// Constructors

11

public AvroRowDataSerializationSchema(RowType rowType);

12

public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);

13

public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding, boolean legacyTimestampMapping);

14

public AvroRowDataSerializationSchema(RowType rowType, SerializationSchema<GenericRecord> nestedSchema, RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);

15

16

// Instance methods

17

public byte[] serialize(RowData row);

18

public void open(InitializationContext context) throws Exception;

19

}

20

```

21

22

### Usage Examples

23

24

**Basic Table Integration:**

25

26

```java

27

import org.apache.flink.table.types.logical.RowType;

28

import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;

29

30

// Define table schema

31

RowType rowType = RowType.of(

32

new LogicalType[] {

33

DataTypes.STRING().getLogicalType(),

34

DataTypes.INT().getLogicalType(),

35

DataTypes.BOOLEAN().getLogicalType()

36

},

37

new String[] {"name", "age", "active"}

38

);

39

40

// Create serializer

41

AvroRowDataSerializationSchema serializer = new AvroRowDataSerializationSchema(rowType);

42

43

// Use in table sink

44

TableSink<?> sink = new CustomTableSink(serializer);

45

```

46

47

**With Custom Encoding:**

48

49

```java

50

// Use JSON encoding for debugging

51

AvroRowDataSerializationSchema jsonSerializer = new AvroRowDataSerializationSchema(

52

rowType,

53

AvroEncoding.JSON

54

);

55

56

// Control timestamp mapping behavior

57

AvroRowDataSerializationSchema legacySerializer = new AvroRowDataSerializationSchema(

58

rowType,

59

AvroEncoding.BINARY,

60

true // Use legacy timestamp mapping for compatibility

61

);

62

```

63

64

## AvroRowDataDeserializationSchema

65

66

Deserializes Avro data to Flink `RowData` objects for Table API operations.

67

68

```java { .api }

69

public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {

70

// Constructors

71

public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo);

72

public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding);

73

public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding, boolean legacyTimestampMapping);

74

75

// Instance methods

76

public RowData deserialize(byte[] message) throws IOException;

77

public TypeInformation<RowData> getProducedType();

78

public boolean isEndOfStream(RowData nextElement);

79

}

80

```

81

82

### Usage Examples

83

84

**Table Source Integration:**

85

86

```java

87

import org.apache.flink.api.common.typeinfo.TypeInformation;

88

import org.apache.flink.table.data.RowData;

89

90

// Create type information for RowData

91

TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);

92

93

// Create deserializer

94

AvroRowDataDeserializationSchema deserializer = new AvroRowDataDeserializationSchema(

95

rowType,

96

typeInfo

97

);

98

99

// Use in table source

100

TableSource<?> source = new CustomTableSource(deserializer);

101

```

102

103

**Kafka Table Integration:**

104

105

```java

106

// Create deserializer for Kafka table source

107

AvroRowDataDeserializationSchema kafkaDeserializer = new AvroRowDataDeserializationSchema(

108

rowType,

109

typeInfo,

110

AvroEncoding.BINARY

111

);

112

113

// Configure Kafka table

114

Map<String, String> properties = new HashMap<>();

115

properties.put("connector", "kafka");

116

properties.put("topic", "avro-topic");

117

properties.put("format", "avro");

118

```

119

120

## Format Factory Integration

121

122

The library provides automatic format discovery for Table API through the format factory.

123

124

```java { .api }

125

public class AvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {

126

public static final String IDENTIFIER = "avro";

127

128

public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

129

public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

130

public String factoryIdentifier();

131

public Set<ConfigOption<?>> requiredOptions();

132

public Set<ConfigOption<?>> optionalOptions();

133

}

134

```

135

136

### SQL DDL Usage

137

138

**Create Table with Avro Format:**

139

140

```sql

141

CREATE TABLE avro_table (

142

name STRING,

143

age INT,

144

active BOOLEAN

145

) WITH (

146

'connector' = 'kafka',

147

'topic' = 'avro-topic',

148

'format' = 'avro',

149

'avro.encoding' = 'binary'

150

);

151

```

152

153

**Configuration Options:**

154

155

```sql

156

CREATE TABLE avro_table_json (

157

name STRING,

158

age INT,

159

created_at TIMESTAMP(3)

160

) WITH (

161

'connector' = 'kafka',

162

'topic' = 'avro-json-topic',

163

'format' = 'avro',

164

'avro.encoding' = 'json',

165

'avro.timestamp_mapping.legacy' = 'false'

166

);

167

```

168

169

## Schema Conversion

170

171

The library automatically converts between Flink and Avro type systems:

172

173

```java { .api }

174

public class AvroSchemaConverter {

175

public static Schema convertToSchema(RowType rowType);

176

public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);

177

public static LogicalType convertToLogicalType(Schema schema);

178

}

179

```

180

181

### Type Mapping

182

183

**Flink to Avro Types:**

184

185

| Flink Type | Avro Type | Notes |

186

|------------|-----------|-------|

187

| BOOLEAN | boolean | Direct mapping |

188

| INT | int | 32-bit signed integer |

189

| BIGINT | long | 64-bit signed integer |

190

| FLOAT | float | 32-bit floating point |

191

| DOUBLE | double | 64-bit floating point |

192

| STRING | string | UTF-8 string |

193

| BINARY/VARBINARY | bytes | Byte array |

194

| TIMESTAMP | timestamp-millis | Logical type (legacy mapping) |

195

| TIMESTAMP_LTZ | timestamp-millis | Logical type (correct mapping) |

196

| DATE | date | Logical type |

197

| TIME | time-millis | Logical type |

198

| DECIMAL | decimal | Logical type with precision/scale |

199

| ARRAY | array | Repeated field |

200

| ROW | record | Nested record |

201

| MAP | map | Key-value mapping |

202

203

## Timestamp Mapping Options

204

205

The library supports two timestamp mapping modes:

206

207

### Legacy Mapping (Default)

208

- Both `TIMESTAMP` and `TIMESTAMP_LTZ` map to Avro `timestamp-millis`

209

- Maintains backward compatibility with Flink versions < 1.19

210

- Enable with `avro.timestamp_mapping.legacy = true`

211

212

### Correct Mapping

213

- `TIMESTAMP` maps to Avro `local-timestamp-millis`

214

- `TIMESTAMP_LTZ` maps to Avro `timestamp-millis`

215

- More semantically correct but may break existing pipelines

216

- Enable with `avro.timestamp_mapping.legacy = false`

217

218

## Error Handling

219

220

**Schema Conversion Errors:**

221

- Throws `IllegalArgumentException` for unsupported type conversions

222

- Logs warnings for lossy type conversions

223

224

**Runtime Errors:**

225

- Serialization failures throw `RuntimeException`

226

- Deserialization failures throw `IOException`

227

- Schema mismatches result in detailed error messages

228

229

## Performance Optimization

230

231

- **Schema Caching**: Converted schemas are cached to avoid repeated conversion

232

- **Converter Reuse**: Row-to-Avro converters are created once and reused

233

- **Memory Efficiency**: Direct field access without object allocation where possible

234

- **Lazy Initialization**: Schema parsing occurs only when needed