or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfilesystem.mdindex.mdregistry.mdrowdata.mdschemas.mdutilities.md

registry.mddocs/

0

# Schema-Encoded Message Support

1

2

## Capabilities

3

4

### Schema-Encoded Deserialization

5

6

Support for deserializing Avro data where schema information is embedded within the message using a configurable schema coder.

7

8

```java { .api }

9

/**

10

* Deserialization schema that reads schema from input stream using SchemaCoder

11

* Extends AvroDeserializationSchema with schema-encoded message support

12

* @param <T> Type of record it produces

13

*/

14

public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {

15

16

/**

17

* Creates schema-encoded deserialization schema

18

* @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)

19

* @param reader Reader's Avro schema (required for GenericRecord)

20

* @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream

21

*/

22

public RegistryAvroDeserializationSchema(

23

Class<T> recordClazz,

24

Schema reader,

25

SchemaCoder.SchemaCoderProvider schemaCoderProvider);

26

27

/**

28

* Creates schema-encoded deserialization schema with custom encoding

29

* @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)

30

* @param reader Reader's Avro schema (required for GenericRecord)

31

* @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream

32

* @param encoding Avro encoding type (BINARY or JSON)

33

*/

34

public RegistryAvroDeserializationSchema(

35

Class<T> recordClazz,

36

Schema reader,

37

SchemaCoder.SchemaCoderProvider schemaCoderProvider,

38

AvroEncoding encoding);

39

40

/**

41

* Deserializes message with embedded schema

42

* @param message Serialized message bytes with embedded schema

43

* @return Deserialized object

44

* @throws IOException If deserialization or schema reading fails

45

*/

46

public T deserialize(byte[] message) throws IOException;

47

48

/**

49

* Checks if element signals end of stream

50

* @param nextElement The element to check

51

* @return Always false for Avro records

52

*/

53

public boolean isEndOfStream(T nextElement);

54

55

/**

56

* Gets the type information for produced type

57

* @return TypeInformation for the produced type

58

*/

59

public TypeInformation<T> getProducedType();

60

}

61

```

62

63

### Schema-Encoded Serialization

64

65

Support for serializing Avro data with embedded schema information.

66

67

```java { .api }

68

/**

69

* Serialization schema that embeds schema information in messages using SchemaCoder

70

* Extends AvroSerializationSchema with schema-encoded message support

71

* @param <T> Type of record it consumes

72

*/

73

public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {

74

75

/**

76

* Creates schema-encoded serialization schema

77

* @param subject Subject name for schema identification

78

* @param recordClazz Class to serialize (SpecificRecord or GenericRecord)

79

* @param writer Writer's Avro schema (required for GenericRecord)

80

* @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream

81

*/

82

public RegistryAvroSerializationSchema(

83

String subject,

84

Class<T> recordClazz,

85

Schema writer,

86

SchemaCoder.SchemaCoderProvider schemaCoderProvider);

87

88

/**

89

* Creates schema-encoded serialization schema with custom encoding

90

* @param subject Subject name for schema identification

91

* @param recordClazz Class to serialize (SpecificRecord or GenericRecord)

92

* @param writer Writer's Avro schema (required for GenericRecord)

93

* @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream

94

* @param encoding Avro encoding type (BINARY or JSON)

95

*/

96

public RegistryAvroSerializationSchema(

97

String subject,

98

Class<T> recordClazz,

99

Schema writer,

100

SchemaCoder.SchemaCoderProvider schemaCoderProvider,

101

AvroEncoding encoding);

102

103

/**

104

* Serializes object with embedded schema

105

* Message includes schema information followed by Avro data

106

* @param object Object to serialize

107

* @return Serialized byte array with embedded schema

108

*/

109

public byte[] serialize(T object);

110

}

111

```

112

113

### Schema Coder Interface

114

115

Interface for reading and writing schema information in message streams.

116

117

```java { .api }

118

/**

119

* Schema coder that allows reading schema embedded in serialized records

120

* Used by RegistryAvroDeserializationSchema and RegistryAvroSerializationSchema

121

*/

122

public interface SchemaCoder {

123

124

/**

125

* Reads schema from input stream

126

* @param in Input stream containing schema information

127

* @return Parsed Avro schema

128

* @throws IOException If schema reading fails

129

*/

130

Schema readSchema(InputStream in) throws IOException;

131

132

/**

133

* Writes schema to output stream

134

* @param schema Avro schema to write

135

* @param out Output stream to write schema to

136

* @throws IOException If schema writing fails

137

*/

138

void writeSchema(Schema schema, OutputStream out) throws IOException;

139

140

/**

141

* Provider interface for creating SchemaCoder instances

142

* Allows creating multiple instances in parallel operators without serialization issues

143

*/

144

interface SchemaCoderProvider extends Serializable {

145

146

/**

147

* Creates a new instance of SchemaCoder

148

* Each call should create a new instance for use in different nodes

149

* @return New SchemaCoder instance

150

*/

151

SchemaCoder get();

152

}

153

}

154

```

155

156

## Usage Examples

157

158

### Basic Schema Registry Setup

159

160

```java

161

// Configuration for schema registry

162

Map<String, String> registryConfig = new HashMap<>();

163

registryConfig.put("schema.registry.url", "http://localhost:8081");

164

registryConfig.put("schema.registry.subject.strategy", "topic-value");

165

166

// For authentication (if required)

167

registryConfig.put("schema.registry.username", "registry-user");

168

registryConfig.put("schema.registry.password", "registry-password");

169

170

// Create registry-aware deserializer

171

RegistryAvroDeserializationSchema<GenericRecord> deserializer =

172

RegistryAvroDeserializationSchema.<GenericRecord>builder()

173

.setRegistryConfig(registryConfig)

174

.setSubject("user-events-value")

175

.build();

176

177

// Create registry-aware serializer

178

RegistryAvroSerializationSchema<GenericRecord> serializer =

179

RegistryAvroSerializationSchema.<GenericRecord>builder()

180

.setRegistryConfig(registryConfig)

181

.setSubject("processed-events-value")

182

.setSchema(outputSchema)

183

.build();

184

```

185

186

### Kafka Integration with Schema Registry

187

188

```java

189

// Kafka properties with schema registry configuration

190

Properties kafkaProps = new Properties();

191

kafkaProps.setProperty("bootstrap.servers", "localhost:9092");

192

kafkaProps.setProperty("group.id", "avro-consumer-group");

193

194

// Schema registry properties

195

Properties schemaRegistryProps = new Properties();

196

schemaRegistryProps.setProperty("schema.registry.url", "http://localhost:8081");

197

198

// Consumer with registry-aware deserialization

199

KafkaSource<GenericRecord> kafkaSource = KafkaSource.<GenericRecord>builder()

200

.setBootstrapServers("localhost:9092")

201

.setTopics("user-events")

202

.setGroupId("avro-consumer-group")

203

.setValueDeserializer(KafkaRecordDeserializationSchema.valueOnly(

204

RegistryAvroDeserializationSchema.<GenericRecord>builder()

205

.setRegistryConfig(schemaRegistryProps)

206

.setSubject("user-events-value")

207

.build()

208

))

209

.build();

210

211

DataStream<GenericRecord> stream = env.fromSource(

212

kafkaSource,

213

WatermarkStrategy.noWatermarks(),

214

"Kafka Avro Source"

215

);

216

217

// Producer with registry-aware serialization

218

KafkaSink<GenericRecord> kafkaSink = KafkaSink.<GenericRecord>builder()

219

.setBootstrapServers("localhost:9092")

220

.setRecordSerializer(KafkaRecordSerializationSchema.<GenericRecord>builder()

221

.setTopic("processed-events")

222

.setValueSerializer(

223

RegistryAvroSerializationSchema.<GenericRecord>builder()

224

.setRegistryConfig(schemaRegistryProps)

225

.setSubject("processed-events-value")

226

.build()

227

)

228

.build())

229

.build();

230

231

processedStream.sinkTo(kafkaSink);

232

```

233

234

### Schema Evolution Handling

235

236

```java

237

// Consumer that handles schema evolution automatically

238

RegistryAvroDeserializationSchema<GenericRecord> evolutionDeserializer =

239

RegistryAvroDeserializationSchema.<GenericRecord>builder()

240

.setRegistryConfig(registryConfig)

241

.setSubject("user-events-value")

242

.setReaderSchema(readerSchema) // Optional: specify expected schema

243

.setSchemaEvolutionEnabled(true)

244

.build();

245

246

DataStream<GenericRecord> stream = env

247

.addSource(new FlinkKafkaConsumer<>("user-events", evolutionDeserializer, kafkaProps));

248

249

// Handle records that may have different schemas

250

DataStream<ProcessedEvent> processed = stream

251

.map(new MapFunction<GenericRecord, ProcessedEvent>() {

252

@Override

253

public ProcessedEvent map(GenericRecord record) throws Exception {

254

ProcessedEvent event = new ProcessedEvent();

255

256

// Required fields (present in all schema versions)

257

event.setUserId((Long) record.get("user_id"));

258

event.setEventType(record.get("event_type").toString());

259

260

// Optional fields (may not exist in older schema versions)

261

Object sessionId = record.get("session_id");

262

if (sessionId != null) {

263

event.setSessionId(sessionId.toString());

264

}

265

266

// New fields (may not exist in older records)

267

Object deviceInfo = record.get("device_info");

268

if (deviceInfo != null) {

269

event.setDeviceInfo(deviceInfo.toString());

270

}

271

272

return event;

273

}

274

});

275

```

276

277

### Multi-Subject Registry Usage

278

279

```java

280

// Different schemas for different types of events

281

Map<String, RegistryAvroDeserializationSchema<GenericRecord>> deserializers = new HashMap<>();

282

283

// User events deserializer

284

deserializers.put("user-events",

285

RegistryAvroDeserializationSchema.<GenericRecord>builder()

286

.setRegistryConfig(registryConfig)

287

.setSubject("user-events-value")

288

.build());

289

290

// System events deserializer

291

deserializers.put("system-events",

292

RegistryAvroDeserializationSchema.<GenericRecord>builder()

293

.setRegistryConfig(registryConfig)

294

.setSubject("system-events-value")

295

.build());

296

297

// Transaction events deserializer

298

deserializers.put("transaction-events",

299

RegistryAvroDeserializationSchema.<GenericRecord>builder()

300

.setRegistryConfig(registryConfig)

301

.setSubject("transaction-events-value")

302

.build());

303

304

// Process different event types

305

DataStream<String> allEvents = env

306

.addSource(new FlinkKafkaConsumer<>(

307

Arrays.asList("user-events", "system-events", "transaction-events"),

308

new KafkaDeserializationSchema<String>() {

309

@Override

310

public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {

311

String topic = record.topic();

312

RegistryAvroDeserializationSchema<GenericRecord> deserializer =

313

deserializers.get(topic);

314

315

if (deserializer != null) {

316

GenericRecord avroRecord = deserializer.deserialize(record.value());

317

return processEventByType(topic, avroRecord);

318

}

319

return null;

320

}

321

322

@Override

323

public boolean isEndOfStream(String nextElement) {

324

return false;

325

}

326

327

@Override

328

public TypeInformation<String> getProducedType() {

329

return BasicTypeInfo.STRING_TYPE_INFO;

330

}

331

},

332

kafkaProps));

333

```

334

335

### Schema Validation and Error Handling

336

337

```java

338

// Registry deserializer with validation and error handling

339

RegistryAvroDeserializationSchema<GenericRecord> validatingDeserializer =

340

RegistryAvroDeserializationSchema.<GenericRecord>builder()

341

.setRegistryConfig(registryConfig)

342

.setSubject("user-events-value")

343

.setValidationEnabled(true)

344

.setErrorMode(ErrorMode.IGNORE) // or FAIL, LOG_AND_CONTINUE

345

.build();

346

347

DataStream<GenericRecord> validatedStream = stream

348

.map(new MapFunction<byte[], GenericRecord>() {

349

@Override

350

public GenericRecord map(byte[] value) throws Exception {

351

try {

352

return validatingDeserializer.deserialize(value);

353

} catch (SchemaNotFoundException e) {

354

// Handle unknown schema ID

355

logger.warn("Unknown schema ID in message: " + e.getSchemaId());

356

return null;

357

} catch (IncompatibleSchemaException e) {

358

// Handle schema compatibility issues

359

logger.error("Schema compatibility error: " + e.getMessage());

360

return null;

361

} catch (IOException e) {

362

// Handle general deserialization errors

363

logger.error("Deserialization error: " + e.getMessage());

364

return null;

365

}

366

}

367

})

368

.filter(Objects::nonNull); // Remove failed deserializations

369

```

370

371

### Table API with Schema Registry

372

373

```java

374

// Create table with schema registry integration

375

String createTableSQL = """

376

CREATE TABLE user_events (

377

user_id BIGINT,

378

event_type STRING,

379

event_data ROW<

380

action STRING,

381

target STRING

382

>,

383

event_time TIMESTAMP(3)

384

) WITH (

385

'connector' = 'kafka',

386

'topic' = 'user-events',

387

'properties.bootstrap.servers' = 'localhost:9092',

388

'format' = 'avro',

389

'avro.schema-registry.url' = 'http://localhost:8081',

390

'avro.schema-registry.subject' = 'user-events-value'

391

)

392

""";

393

394

tableEnv.executeSql(createTableSQL);

395

396

// Output table with schema registry

397

String createOutputTableSQL = """

398

CREATE TABLE processed_events (

399

user_id BIGINT,

400

event_count BIGINT,

401

last_event_time TIMESTAMP(3)

402

) WITH (

403

'connector' = 'kafka',

404

'topic' = 'processed-events',

405

'properties.bootstrap.servers' = 'localhost:9092',

406

'format' = 'avro',

407

'avro.schema-registry.url' = 'http://localhost:8081',

408

'avro.schema-registry.subject' = 'processed-events-value'

409

)

410

""";

411

412

tableEnv.executeSql(createOutputTableSQL);

413

414

// Process with SQL

415

String processingSQL = """

416

INSERT INTO processed_events

417

SELECT

418

user_id,

419

COUNT(*) as event_count,

420

MAX(event_time) as last_event_time

421

FROM user_events

422

WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR

423

GROUP BY user_id

424

""";

425

426

tableEnv.executeSql(processingSQL);

427

```

428

429

### Schema Registry Administration

430

431

```java

432

// Utility methods for schema registry management

433

public class SchemaRegistryUtils {

434

435

/**

436

* Register a new schema version

437

*/

438

public static int registerSchema(String registryUrl, String subject, Schema schema)

439

throws IOException {

440

// Implementation for registering schemas

441

SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);

442

return client.register(subject, schema);

443

}

444

445

/**

446

* Get latest schema for subject

447

*/

448

public static Schema getLatestSchema(String registryUrl, String subject)

449

throws IOException {

450

SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);

451

return client.getLatestSchemaMetadata(subject).getSchema();

452

}

453

454

/**

455

* Check schema compatibility

456

*/

457

public static boolean isCompatible(String registryUrl, String subject, Schema newSchema)

458

throws IOException {

459

SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);

460

return client.testCompatibility(subject, newSchema);

461

}

462

}

463

464

// Example usage

465

Schema newSchema = // ... your new schema

466

boolean compatible = SchemaRegistryUtils.isCompatible(

467

"http://localhost:8081",

468

"user-events-value",

469

newSchema

470

);

471

472

if (compatible) {

473

int schemaId = SchemaRegistryUtils.registerSchema(

474

"http://localhost:8081",

475

"user-events-value",

476

newSchema

477

);

478

System.out.println("Registered new schema with ID: " + schemaId);

479

}

480

```