or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfilesystem.mdindex.mdregistry.mdrowdata.mdschemas.mdutilities.md

rowdata.mddocs/

0

# Row Data Integration

1

2

## Capabilities

3

4

### RowData Deserialization

5

6

Integration with Flink's internal RowData format for table API and SQL processing.

7

8

```java { .api }

9

/**

10

* Deserialization schema that converts Avro records to Flink RowData

11

* Marked as @PublicEvolving API

12

*/

13

public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {

14

15

/**

16

* Creates an AvroRowDataDeserializationSchema with default settings

17

* @param rowType The Flink RowType describing the target schema

18

* @param typeInfo Type information for the RowData

19

*/

20

public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> typeInfo);

21

22

/**

23

* Creates an AvroRowDataDeserializationSchema with custom encoding

24

* @param rowType The Flink RowType describing the target schema

25

* @param typeInfo Type information for the RowData

26

* @param encoding Avro encoding type (BINARY or JSON)

27

*/

28

public AvroRowDataDeserializationSchema(

29

RowType rowType,

30

TypeInformation<RowData> typeInfo,

31

AvroEncoding encoding);

32

33

/**

34

* Creates an AvroRowDataDeserializationSchema with full configuration

35

* @param rowType The Flink RowType describing the target schema

36

* @param typeInfo Type information for the RowData

37

* @param encoding Avro encoding type (BINARY or JSON)

38

* @param legacyTimestampMapping Whether to use legacy timestamp mapping

39

*/

40

public AvroRowDataDeserializationSchema(

41

RowType rowType,

42

TypeInformation<RowData> typeInfo,

43

AvroEncoding encoding,

44

boolean legacyTimestampMapping);

45

46

/**

47

* Creates an AvroRowDataDeserializationSchema with custom nested schema

48

* @param nestedSchema The nested Avro deserialization schema

49

* @param runtimeConverter Runtime converter for Avro to RowData conversion

50

* @param typeInfo Type information for the RowData

51

*/

52

public AvroRowDataDeserializationSchema(

53

DeserializationSchema<GenericRecord> nestedSchema,

54

AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter,

55

TypeInformation<RowData> typeInfo);

56

57

/**

58

* Deserializes byte array to RowData

59

* @param message Serialized Avro message bytes

60

* @return RowData instance, or null if message is null

61

* @throws IOException If deserialization fails

62

*/

63

public RowData deserialize(byte[] message) throws IOException;

64

65

/**

66

* Checks if element signals end of stream

67

* @param nextElement The RowData element to check

68

* @return Always false for Avro records

69

*/

70

public boolean isEndOfStream(RowData nextElement);

71

72

/**

73

* Gets the type information for produced RowData

74

* @return TypeInformation for RowData

75

*/

76

public TypeInformation<RowData> getProducedType();

77

}

78

```

79

80

### RowData Serialization

81

82

```java { .api }

83

/**

84

* Serialization schema that converts Flink RowData to Avro format

85

*/

86

public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {

87

88

/**

89

* Creates an AvroRowDataSerializationSchema with default settings

90

* @param rowType The Flink RowType describing the source schema

91

*/

92

public AvroRowDataSerializationSchema(RowType rowType);

93

94

/**

95

* Creates an AvroRowDataSerializationSchema with custom encoding

96

* @param rowType The Flink RowType describing the source schema

97

* @param encoding Avro encoding type (BINARY or JSON)

98

*/

99

public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);

100

101

/**

102

* Creates an AvroRowDataSerializationSchema with full configuration

103

* @param rowType The Flink RowType describing the source schema

104

* @param encoding Avro encoding type (BINARY or JSON)

105

* @param legacyTimestampMapping Whether to use legacy timestamp mapping

106

*/

107

public AvroRowDataSerializationSchema(

108

RowType rowType,

109

AvroEncoding encoding,

110

boolean legacyTimestampMapping);

111

112

/**

113

* Creates an AvroRowDataSerializationSchema with custom nested schema

114

* @param rowType The Flink RowType describing the source schema

115

* @param nestedSchema The nested Avro serialization schema

116

* @param runtimeConverter Runtime converter for RowData to Avro conversion

117

*/

118

public AvroRowDataSerializationSchema(

119

RowType rowType,

120

SerializationSchema<GenericRecord> nestedSchema,

121

RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);

122

123

/**

124

* Serializes RowData to byte array

125

* @param rowData The RowData to serialize

126

* @return Serialized byte array

127

*/

128

public byte[] serialize(RowData rowData);

129

130

/**

131

* Opens the serializer for initialization

132

* @param context Initialization context

133

* @throws Exception If initialization fails

134

*/

135

public void open(InitializationContext context) throws Exception;

136

}

137

```

138

139

### Type Conversion Utilities

140

141

```java { .api }

142

/**

143

* Utilities for converting Avro records to Flink RowData

144

*/

145

public class AvroToRowDataConverters {

146

147

/**

148

* Interface for converting Avro records to RowData

149

*/

150

public interface AvroToRowDataConverter {

151

/**

152

* Converts an Avro record to RowData

153

* @param avroObject The Avro record to convert

154

* @return Converted RowData

155

*/

156

Object convert(Object avroObject);

157

}

158

159

/**

160

* Creates a converter for the given RowType

161

* @param rowType The target Flink RowType

162

* @return Converter instance

163

*/

164

public static AvroToRowDataConverter createRowConverter(RowType rowType);

165

}

166

167

/**

168

* Utilities for converting Flink RowData to Avro records

169

*/

170

public class RowDataToAvroConverters {

171

172

/**

173

* Interface for converting RowData to Avro records

174

*/

175

public interface RowDataToAvroConverter {

176

/**

177

* Converts RowData to an Avro record

178

* @param schema Target Avro schema

179

* @param rowData The RowData to convert

180

* @return Converted Avro record

181

*/

182

Object convert(Schema schema, RowData rowData);

183

}

184

185

/**

186

* Creates a converter for the given RowType

187

* @param rowType The source Flink RowType

188

* @return Converter instance

189

*/

190

public static RowDataToAvroConverter createConverter(RowType rowType);

191

}

192

```

193

194

## Usage Examples

195

196

### Table API Integration

197

198

```java

199

// Define Flink table schema

200

RowType rowType = RowType.of(

201

new DataType[] {

202

DataTypes.BIGINT(),

203

DataTypes.STRING(),

204

DataTypes.STRING(),

205

DataTypes.TIMESTAMP(3)

206

},

207

new String[] {"user_id", "username", "email", "created_at"}

208

);

209

210

// Create type information

211

TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);

212

213

// Create deserializer for table processing

214

AvroRowDataDeserializationSchema deserializer =

215

new AvroRowDataDeserializationSchema(

216

rowType,

217

typeInfo,

218

AvroEncoding.BINARY,

219

false // Use correct timestamp mapping

220

);

221

222

// Create serializer for output

223

AvroRowDataSerializationSchema serializer =

224

new AvroRowDataSerializationSchema(

225

rowType,

226

AvroEncoding.BINARY,

227

false

228

);

229

```

230

231

### DataStream to Table Conversion

232

233

```java

234

// Convert byte stream to RowData

235

DataStream<byte[]> avroStream = // ... your Avro byte stream

236

237

DataStream<RowData> rowDataStream = avroStream

238

.map(new MapFunction<byte[], RowData>() {

239

private AvroRowDataDeserializationSchema deserializer;

240

241

@Override

242

public void open(Configuration parameters) throws Exception {

243

RowType rowType = RowType.of(

244

new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},

245

new String[] {"id", "name"}

246

);

247

TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);

248

249

deserializer = new AvroRowDataDeserializationSchema(

250

rowType, typeInfo, AvroEncoding.BINARY, false

251

);

252

}

253

254

@Override

255

public RowData map(byte[] value) throws Exception {

256

return deserializer.deserialize(value);

257

}

258

});

259

260

// Convert to Table for SQL processing

261

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

262

Table table = tableEnv.fromDataStream(rowDataStream);

263

264

// Register as temporary table

265

tableEnv.createTemporaryView("my_table", table);

266

267

// Process with SQL

268

Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM my_table WHERE id > 100");

269

```

270

271

### Custom Type Conversions

272

273

```java

274

// Create custom converter for complex types

275

RowType complexRowType = RowType.of(

276

new DataType[] {

277

DataTypes.ROW(

278

DataTypes.FIELD("street", DataTypes.STRING()),

279

DataTypes.FIELD("city", DataTypes.STRING()),

280

DataTypes.FIELD("zipcode", DataTypes.STRING())

281

),

282

DataTypes.ARRAY(DataTypes.STRING()),

283

DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())

284

},

285

new String[] {"address", "tags", "metadata"}

286

);

287

288

// Create converter

289

AvroToRowDataConverters.AvroToRowDataConverter converter =

290

AvroToRowDataConverters.createRowConverter(complexRowType);

291

292

// Use converter manually

293

GenericRecord avroRecord = // ... your Avro record

294

RowData rowData = (RowData) converter.convert(avroRecord);

295

296

// For serialization direction

297

RowDataToAvroConverters.RowDataToAvroConverter toAvroConverter =

298

RowDataToAvroConverters.createConverter(complexRowType);

299

300

Schema avroSchema = AvroSchemaConverter.convertToSchema(complexRowType);

301

GenericRecord converted = (GenericRecord) toAvroConverter.convert(avroSchema, rowData);

302

```

303

304

### SQL Table with Custom Schema

305

306

```java

307

// Create table with Avro format and specific configuration

308

String createTableSQL = """

309

CREATE TABLE user_events (

310

user_id BIGINT,

311

event_type STRING,

312

event_data ROW<

313

action STRING,

314

target STRING,

315

metadata MAP<STRING, STRING>

316

>,

317

event_time TIMESTAMP(3),

318

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

319

) WITH (

320

'connector' = 'kafka',

321

'topic' = 'user-events',

322

'properties.bootstrap.servers' = 'localhost:9092',

323

'format' = 'avro',

324

'avro.encoding' = 'binary',

325

'avro.timestamp_mapping.legacy' = 'false'

326

)

327

""";

328

329

tableEnv.executeSql(createTableSQL);

330

331

// Query the table

332

Table result = tableEnv.sqlQuery("""

333

SELECT

334

user_id,

335

event_type,

336

event_data.action,

337

COUNT(*) as event_count,

338

TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end

339

FROM user_events

340

WHERE event_data.action = 'click'

341

GROUP BY

342

user_id,

343

event_type,

344

event_data.action,

345

TUMBLE(event_time, INTERVAL '1' MINUTE)

346

""");

347

```

348

349

### Handling Nullable Fields

350

351

```java

352

// Schema with nullable fields

353

RowType nullableRowType = RowType.of(

354

new DataType[] {

355

DataTypes.BIGINT().notNull(), // Required field

356

DataTypes.STRING(), // Nullable string

357

DataTypes.INT(), // Nullable int

358

DataTypes.TIMESTAMP(3) // Nullable timestamp

359

},

360

new String[] {"id", "name", "age", "last_login"}

361

);

362

363

// The deserializer automatically handles null values according to Avro schema

364

AvroRowDataDeserializationSchema deserializer =

365

new AvroRowDataDeserializationSchema(

366

nullableRowType,

367

InternalTypeInfo.of(nullableRowType),

368

AvroEncoding.BINARY,

369

false

370

);

371

372

// Access nullable fields safely

373

DataStream<RowData> processed = rowDataStream

374

.map(new MapFunction<RowData, RowData>() {

375

@Override

376

public RowData map(RowData row) throws Exception {

377

// Check if name field is null (field index 1)

378

if (!row.isNullAt(1)) {

379

String name = row.getString(1).toString();

380

// Process non-null name

381

}

382

return row;

383

}

384

});

385

```