or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-format.mdconfiguration.mddebezium-format.mdindex.md

avro-format.mddocs/

0

# Standard Avro Format

1

2

Core Avro serialization and deserialization functionality with Confluent Schema Registry integration. Supports both generic records and generated specific record classes with comprehensive configuration options.

3

4

## Capabilities

5

6

### Generic Record Deserialization

7

8

Creates deserializers for generic Avro records using reader schema and writer schema lookup from Confluent Schema Registry.

9

10

```java { .api }

11

/**

12

* Creates deserializer for GenericRecord using provided reader schema

13

* @param schema Reader schema for produced records

14

* @param url Schema Registry URL

15

* @return Deserializer instance

16

*/

17

static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(

18

Schema schema,

19

String url

20

);

21

22

/**

23

* Creates deserializer with custom cache capacity

24

* @param schema Reader schema for produced records

25

* @param url Schema Registry URL

26

* @param identityMapCapacity Maximum cached schema versions (default: 1000)

27

* @return Deserializer instance

28

*/

29

static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(

30

Schema schema,

31

String url,

32

int identityMapCapacity

33

);

34

35

/**

36

* Creates deserializer with additional registry configurations

37

* @param schema Reader schema for produced records

38

* @param url Schema Registry URL

39

* @param registryConfigs Additional Schema Registry configs (SSL, auth)

40

* @return Deserializer instance

41

*/

42

static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(

43

Schema schema,

44

String url,

45

@Nullable Map<String, ?> registryConfigs

46

);

47

48

/**

49

* Creates deserializer with full configuration options

50

* @param schema Reader schema for produced records

51

* @param url Schema Registry URL

52

* @param identityMapCapacity Maximum cached schema versions

53

* @param registryConfigs Additional Schema Registry configs

54

* @return Deserializer instance

55

*/

56

static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(

57

Schema schema,

58

String url,

59

int identityMapCapacity,

60

@Nullable Map<String, ?> registryConfigs

61

);

62

```

63

64

**Usage Example:**

65

66

```java

67

import org.apache.avro.Schema;

68

import org.apache.avro.generic.GenericRecord;

69

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;

70

71

// Parse reader schema

72

String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";

73

Schema readerSchema = new Schema.Parser().parse(schemaString);

74

75

// Create deserializer with SSL configuration

76

Map<String, String> registryConfigs = new HashMap<>();

77

registryConfigs.put("schema.registry.ssl.keystore.location", "/path/to/keystore.jks");

78

registryConfigs.put("schema.registry.ssl.keystore.password", "password");

79

80

ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer =

81

ConfluentRegistryAvroDeserializationSchema.forGeneric(

82

readerSchema,

83

"https://schema-registry.example.com",

84

1000,

85

registryConfigs

86

);

87

```

88

89

### Specific Record Deserialization

90

91

Creates deserializers for Avro-generated specific record classes.

92

93

```java { .api }

94

/**

95

* Creates deserializer for specific record class

96

* @param tClass Generated Avro record class

97

* @param url Schema Registry URL

98

* @return Deserializer instance

99

*/

100

static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(

101

Class<T> tClass,

102

String url

103

);

104

105

/**

106

* Creates deserializer with custom cache capacity

107

* @param tClass Generated Avro record class

108

* @param url Schema Registry URL

109

* @param identityMapCapacity Maximum cached schema versions

110

* @return Deserializer instance

111

*/

112

static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(

113

Class<T> tClass,

114

String url,

115

int identityMapCapacity

116

);

117

118

/**

119

* Creates deserializer with additional registry configurations

120

* @param tClass Generated Avro record class

121

* @param url Schema Registry URL

122

* @param registryConfigs Additional Schema Registry configs

123

* @return Deserializer instance

124

*/

125

static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(

126

Class<T> tClass,

127

String url,

128

@Nullable Map<String, ?> registryConfigs

129

);

130

131

/**

132

* Creates deserializer with full configuration options

133

* @param tClass Generated Avro record class

134

* @param url Schema Registry URL

135

* @param identityMapCapacity Maximum cached schema versions

136

* @param registryConfigs Additional Schema Registry configs

137

* @return Deserializer instance

138

*/

139

static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(

140

Class<T> tClass,

141

String url,

142

int identityMapCapacity,

143

@Nullable Map<String, ?> registryConfigs

144

);

145

```

146

147

**Usage Example:**

148

149

```java

150

import com.example.avro.User; // Generated Avro class

151

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;

152

153

// Create deserializer for specific record class

154

ConfluentRegistryAvroDeserializationSchema<User> deserializer =

155

ConfluentRegistryAvroDeserializationSchema.forSpecific(

156

User.class,

157

"https://schema-registry.example.com"

158

);

159

```

160

161

### Generic Record Serialization

162

163

Creates serializers for generic Avro records with schema registration to Confluent Schema Registry.

164

165

```java { .api }

166

/**

167

* Creates serializer for GenericRecord

168

* @param subject Schema Registry subject name

169

* @param schema Writer schema for serialization

170

* @param schemaRegistryUrl Schema Registry URL

171

* @return Serializer instance

172

*/

173

static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(

174

String subject,

175

Schema schema,

176

String schemaRegistryUrl

177

);

178

179

/**

180

* Creates serializer with additional registry configurations

181

* @param subject Schema Registry subject name

182

* @param schema Writer schema for serialization

183

* @param schemaRegistryUrl Schema Registry URL

184

* @param registryConfigs Additional Schema Registry configs

185

* @return Serializer instance

186

*/

187

static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(

188

String subject,

189

Schema schema,

190

String schemaRegistryUrl,

191

@Nullable Map<String, ?> registryConfigs

192

);

193

```

194

195

**Usage Example:**

196

197

```java

198

import org.apache.avro.Schema;

199

import org.apache.avro.generic.GenericRecord;

200

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;

201

202

// Parse writer schema

203

String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";

204

Schema writerSchema = new Schema.Parser().parse(schemaString);

205

206

// Create serializer

207

ConfluentRegistryAvroSerializationSchema<GenericRecord> serializer =

208

ConfluentRegistryAvroSerializationSchema.forGeneric(

209

"user-topic-value",

210

writerSchema,

211

"https://schema-registry.example.com"

212

);

213

```

214

215

### Specific Record Serialization

216

217

Creates serializers for Avro-generated specific record classes.

218

219

```java { .api }

220

/**

221

* Creates serializer for specific record class

222

* @param tClass Generated Avro record class

223

* @param subject Schema Registry subject name

224

* @param schemaRegistryUrl Schema Registry URL

225

* @return Serializer instance

226

*/

227

static <T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(

228

Class<T> tClass,

229

String subject,

230

String schemaRegistryUrl

231

);

232

233

/**

234

* Creates serializer with additional registry configurations

235

* @param tClass Generated Avro record class

236

* @param subject Schema Registry subject name

237

* @param schemaRegistryUrl Schema Registry URL

238

* @param registryConfigs Additional Schema Registry configs

239

* @return Serializer instance

240

*/

241

static <T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(

242

Class<T> tClass,

243

String subject,

244

String schemaRegistryUrl,

245

@Nullable Map<String, ?> registryConfigs

246

);

247

```

248

249

**Usage Example:**

250

251

```java

252

import com.example.avro.User; // Generated Avro class

253

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;

254

255

// Create serializer for specific record class

256

ConfluentRegistryAvroSerializationSchema<User> serializer =

257

ConfluentRegistryAvroSerializationSchema.forSpecific(

258

User.class,

259

"user-topic-value",

260

"https://schema-registry.example.com"

261

);

262

```

263

264

### Table Format Factory

265

266

Factory for creating runtime format instances in Flink's table API.

267

268

```java { .api }

269

/**

270

* Format factory identifier for SQL DDL

271

*/

272

String IDENTIFIER = "avro-confluent";

273

274

/**

275

* Creates decoding format for table sources

276

* @param context Table factory context

277

* @param formatOptions Configuration options

278

* @return Decoding format instance

279

*/

280

DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(

281

DynamicTableFactory.Context context,

282

ReadableConfig formatOptions

283

);

284

285

/**

286

* Creates encoding format for table sinks

287

* @param context Table factory context

288

* @param formatOptions Configuration options

289

* @return Encoding format instance

290

*/

291

EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(

292

DynamicTableFactory.Context context,

293

ReadableConfig formatOptions

294

);

295

296

/**

297

* Returns set of required configuration options

298

* @return Required options (URL)

299

*/

300

Set<ConfigOption<?>> requiredOptions();

301

302

/**

303

* Returns set of optional configuration options

304

* @return Optional options (SUBJECT, SCHEMA, SSL, auth options)

305

*/

306

Set<ConfigOption<?>> optionalOptions();

307

```

308

309

## SQL Table Integration

310

311

The format can be used directly in Flink SQL DDL:

312

313

```sql

314

-- Source table with explicit schema

315

CREATE TABLE user_events (

316

user_id BIGINT,

317

event_name STRING,

318

event_time TIMESTAMP(3),

319

properties MAP<STRING, STRING>

320

) WITH (

321

'connector' = 'kafka',

322

'topic' = 'user-events',

323

'properties.bootstrap.servers' = 'localhost:9092',

324

'format' = 'avro-confluent',

325

'avro-confluent.url' = 'http://localhost:8081',

326

'avro-confluent.subject' = 'user-events-value',

327

'avro-confluent.schema' = '{"type":"record","name":"UserEvent","fields":[{"name":"user_id","type":"long"},{"name":"event_name","type":"string"},{"name":"event_time","type":"long"},{"name":"properties","type":{"type":"map","values":"string"}}]}'

328

);

329

330

-- Sink table requiring subject for schema registration

331

CREATE TABLE processed_events (

332

user_id BIGINT,

333

event_count BIGINT,

334

last_event_time TIMESTAMP(3)

335

) WITH (

336

'connector' = 'kafka',

337

'topic' = 'processed-events',

338

'properties.bootstrap.servers' = 'localhost:9092',

339

'format' = 'avro-confluent',

340

'avro-confluent.url' = 'http://localhost:8081',

341

'avro-confluent.subject' = 'processed-events-value'

342

);

343

```