or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writers.mdfile-io-operations.mdindex.mdschema-registry-integration.mdserialization-deserialization.mdtable-api-integration.mdtype-system-integration.md

schema-registry-integration.mddocs/

0

# Schema Registry Integration

1

2

Extended serialization and deserialization schemas with Confluent Schema Registry support for centralized schema management, evolution, and compatibility checking in distributed Avro data processing.

3

4

## RegistryAvroSerializationSchema

5

6

Extended serialization schema that integrates with Confluent Schema Registry for centralized schema management.

7

8

```java { .api }

9

public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {

10

// Inherits all methods from AvroSerializationSchema

11

// Additional schema registry specific functionality

12

}

13

```

14

15

### Key Features

16

17

- **Schema Registration**: Automatically registers schemas with the registry

18

- **Schema Evolution**: Supports forward, backward, and full compatibility

19

- **Schema Caching**: Caches schemas locally for performance

20

- **Subject Management**: Manages schema subjects and versions

21

- **Compatibility Checking**: Validates schema changes against compatibility rules

22

23

### Usage Examples

24

25

**Basic Registry Integration:**

26

27

```java

28

import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;

29

30

// Create registry-aware serializer

31

// Configuration typically done through Flink configuration or environment

32

RegistryAvroSerializationSchema<User> registrySerializer =

33

new RegistryAvroSerializationSchema<>(User.class, registryConfig);

34

35

// Use in streaming pipeline

36

DataStream<User> userStream = ...;

37

DataStream<byte[]> serializedStream = userStream.map(registrySerializer::serialize);

38

```

39

40

**With Schema Evolution:**

41

42

```java

43

// Register new schema version

44

String subject = "user-value";

45

Schema newUserSchema = parseNewUserSchema();

46

47

// Serializer automatically handles schema versioning

48

RegistryAvroSerializationSchema<User> evolvingSerializer =

49

createRegistrySerializer(User.class, subject, newUserSchema);

50

51

// Messages include schema ID for proper deserialization

52

DataStream<byte[]> versionedStream = userStream.map(evolvingSerializer::serialize);

53

```

54

55

## RegistryAvroDeserializationSchema

56

57

Extended deserialization schema that uses Confluent Schema Registry for schema resolution and evolution handling.

58

59

```java { .api }

60

public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {

61

// Inherits all methods from AvroDeserializationSchema

62

// Additional schema registry specific functionality

63

}

64

```

65

66

### Key Features

67

68

- **Schema Resolution**: Automatically resolves schemas by ID from registry

69

- **Multi-Version Support**: Handles messages with different schema versions

70

- **Reader Schema Evolution**: Supports schema evolution patterns

71

- **Lazy Loading**: Loads schemas on-demand for better performance

72

- **Error Recovery**: Graceful handling of registry connectivity issues

73

74

### Usage Examples

75

76

**Registry-based Deserialization:**

77

78

```java

79

import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;

80

81

// Create registry-aware deserializer

82

RegistryAvroDeserializationSchema<User> registryDeserializer =

83

new RegistryAvroDeserializationSchema<>(User.class, registryConfig);

84

85

// Use in Kafka source

86

FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(

87

"user-topic",

88

registryDeserializer,

89

kafkaProperties

90

);

91

92

DataStream<User> userStream = env.addSource(consumer);

93

```

94

95

**Multi-Version Message Processing:**

96

97

```java

98

// Deserializer handles multiple schema versions automatically

99

RegistryAvroDeserializationSchema<User> multiVersionDeserializer =

100

createRegistryDeserializer(User.class);

101

102

// Messages with different schema versions are properly deserialized

103

DataStream<User> unifiedStream = rawMessageStream

104

.map(bytes -> {

105

try {

106

return multiVersionDeserializer.deserialize(bytes);

107

} catch (IOException e) {

108

// Handle deserialization errors

109

return null;

110

}

111

})

112

.filter(Objects::nonNull);

113

```

114

115

## Schema Registry Configuration

116

117

### Connection Configuration

118

119

```java

120

// Schema Registry client configuration

121

Map<String, Object> registryConfig = new HashMap<>();

122

registryConfig.put("schema.registry.url", "http://schema-registry:8081");

123

registryConfig.put("basic.auth.credentials.source", "USER_INFO");

124

registryConfig.put("basic.auth.user.info", "username:password");

125

126

// SSL configuration for secure connections

127

registryConfig.put("schema.registry.ssl.truststore.location", "/path/to/truststore.jks");

128

registryConfig.put("schema.registry.ssl.truststore.password", "truststore-password");

129

registryConfig.put("schema.registry.ssl.keystore.location", "/path/to/keystore.jks");

130

registryConfig.put("schema.registry.ssl.keystore.password", "keystore-password");

131

```

132

133

### Subject and Compatibility Configuration

134

135

```java

136

// Subject naming strategy

137

registryConfig.put("subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy");

138

139

// Compatibility level

140

registryConfig.put("schema.registry.compatibility.level", "BACKWARD");

141

142

// Schema caching

143

registryConfig.put("schema.registry.cache.capacity", "1000");

144

registryConfig.put("schema.registry.cache.expiry.secs", "300");

145

```

146

147

## Schema Evolution Patterns

148

149

### Backward Compatibility

150

151

**Adding Optional Fields:**

152

153

```avro

154

// Original schema

155

{

156

"type": "record",

157

"name": "User",

158

"fields": [

159

{"name": "id", "type": "long"},

160

{"name": "name", "type": "string"}

161

]

162

}

163

164

// Evolved schema (backward compatible)

165

{

166

"type": "record",

167

"name": "User",

168

"fields": [

169

{"name": "id", "type": "long"},

170

{"name": "name", "type": "string"},

171

{"name": "email", "type": ["null", "string"], "default": null}

172

]

173

}

174

```

175

176

**Usage with Evolution:**

177

178

```java

179

// Old producers can write to new schema

180

RegistryAvroSerializationSchema<UserV1> oldSerializer =

181

createRegistrySerializer(UserV1.class, "user-value");

182

183

// New consumers can read old messages

184

RegistryAvroDeserializationSchema<UserV2> newDeserializer =

185

createRegistryDeserializer(UserV2.class, "user-value");

186

```

187

188

### Forward Compatibility

189

190

**Removing Optional Fields:**

191

192

```java

193

// New producers write simplified schema

194

RegistryAvroSerializationSchema<UserV2> simplifiedSerializer =

195

createRegistrySerializer(UserV2.class, "user-value");

196

197

// Old consumers can still process messages

198

RegistryAvroDeserializationSchema<UserV1> oldDeserializer =

199

createRegistryDeserializer(UserV1.class, "user-value");

200

```

201

202

### Full Compatibility

203

204

```java

205

// Schema evolution that supports both directions

206

public class SchemaEvolutionHandler {

207

public void handleEvolution(Schema writerSchema, Schema readerSchema) {

208

// Validate compatibility

209

SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);

210

211

// Apply evolution rules

212

applyEvolutionRules(writerSchema, readerSchema);

213

}

214

}

215

```

216

217

## Error Handling and Monitoring

218

219

### Registry Connectivity

220

221

```java

222

// Robust registry client with retry logic

223

public class ResilientRegistryClient {

224

private final SchemaRegistryClient client;

225

private final RetryPolicy retryPolicy;

226

227

public Schema getSchemaById(int id) throws IOException {

228

return retryPolicy.execute(() -> {

229

try {

230

return client.getSchemaById(id);

231

} catch (RestClientException e) {

232

if (e.getStatus() == 404) {

233

throw new SchemaNotFoundException("Schema not found: " + id);

234

}

235

throw new RegistryException("Registry error", e);

236

}

237

});

238

}

239

}

240

```

241

242

### Schema Validation Errors

243

244

```java

245

// Handle schema validation failures

246

try {

247

byte[] serialized = registrySerializer.serialize(userRecord);

248

} catch (SerializationException e) {

249

if (e.getCause() instanceof RestClientException) {

250

RestClientException rce = (RestClientException) e.getCause();

251

if (rce.getStatus() == 409) {

252

logger.error("Schema compatibility violation", e);

253

// Handle compatibility error

254

}

255

}

256

throw new ProcessingException("Serialization failed", e);

257

}

258

```

259

260

### Monitoring and Metrics

261

262

```java

263

// Registry metrics collection

264

public class RegistryMetrics {

265

private final Counter schemaLookups = Counter.build()

266

.name("schema_registry_lookups_total")

267

.help("Total schema lookups")

268

.register();

269

270

private final Histogram lookupLatency = Histogram.build()

271

.name("schema_registry_lookup_duration_seconds")

272

.help("Schema lookup latency")

273

.register();

274

275

public Schema getSchemaWithMetrics(int id) throws IOException {

276

Timer.Sample sample = Timer.start();

277

try {

278

schemaLookups.inc();

279

return client.getSchemaById(id);

280

} finally {

281

sample.stop(lookupLatency);

282

}

283

}

284

}

285

```

286

287

## Best Practices

288

289

### Schema Design

290

291

**Evolving Schemas:**

292

293

```avro

294

{

295

"type": "record",

296

"name": "User",

297

"namespace": "com.example.avro",

298

"fields": [

299

{"name": "id", "type": "long"},

300

{"name": "name", "type": "string"},

301

// Use unions for optional fields

302

{"name": "email", "type": ["null", "string"], "default": null},

303

// Use logical types for better semantics

304

{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},

305

// Provide defaults for new fields

306

{"name": "version", "type": "int", "default": 1}

307

]

308

}

309

```

310

311

### Performance Optimization

312

313

**Schema Caching:**

314

315

```java

316

// Configure appropriate cache sizes

317

Map<String, Object> config = new HashMap<>();

318

config.put("schema.registry.cache.capacity", "10000");

319

config.put("schema.registry.cache.expiry.secs", "3600");

320

321

// Use schema caching in high-throughput scenarios

322

CachedSchemaRegistryClient cachedClient = new CachedSchemaRegistryClient(

323

"http://schema-registry:8081",

324

10000, // cache capacity

325

config

326

);

327

```

328

329

**Connection Pooling:**

330

331

```java

332

// Reuse registry clients across serializers

333

public class RegistryClientFactory {

334

private static final SchemaRegistryClient SHARED_CLIENT =

335

new CachedSchemaRegistryClient(registryUrl, cacheCapacity);

336

337

public static SchemaRegistryClient getClient() {

338

return SHARED_CLIENT;

339

}

340

}

341

```

342

343

### Security Configuration

344

345

**Authentication:**

346

347

```java

348

// SASL authentication

349

registryConfig.put("basic.auth.credentials.source", "SASL_INHERIT");

350

registryConfig.put("sasl.mechanism", "PLAIN");

351

registryConfig.put("sasl.jaas.config",

352

"org.apache.kafka.common.security.plain.PlainLoginModule required " +

353

"username=\"schema-registry-user\" password=\"password\";");

354

```

355

356

**SSL/TLS:**

357

358

```java

359

// SSL configuration

360

registryConfig.put("schema.registry.ssl.endpoint.identification.algorithm", "https");

361

registryConfig.put("schema.registry.ssl.protocol", "TLSv1.2");

362

registryConfig.put("schema.registry.ssl.enabled.protocols", "TLSv1.2");

363

```

364

365

## Deployment Considerations

366

367

### High Availability

368

369

```java

370

// Multiple registry URLs for failover

371

String registryUrls = "http://registry1:8081,http://registry2:8081,http://registry3:8081";

372

registryConfig.put("schema.registry.url", registryUrls);

373

374

// Connection timeout configuration

375

registryConfig.put("schema.registry.request.timeout.ms", "30000");

376

registryConfig.put("schema.registry.connect.timeout.ms", "10000");

377

```

378

379

### Environment-specific Configuration

380

381

```java

382

// Environment-based configuration

383

public class RegistryConfigFactory {

384

public static Map<String, Object> createConfig(Environment env) {

385

Map<String, Object> config = new HashMap<>();

386

387

switch (env) {

388

case DEVELOPMENT:

389

config.put("schema.registry.url", "http://localhost:8081");

390

config.put("schema.registry.compatibility.level", "NONE");

391

break;

392

case PRODUCTION:

393

config.put("schema.registry.url", "https://prod-registry:8081");

394

config.put("schema.registry.compatibility.level", "BACKWARD");

395

// Add authentication and SSL configuration

396

break;

397

}

398

399

return config;

400

}

401

}

402

```