or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry

Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-avro-confluent-registry@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry@2.1.0

0

# Apache Flink SQL Avro Confluent Registry

1

2

Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration. This library enables seamless serialization and deserialization of Kafka messages with centralized schema management, providing both standard Avro format and Debezium change data capture support.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-sql-avro-confluent-registry

7

- **Package Type**: Maven JAR

8

- **Language**: Java

9

- **Version**: 2.1.0

10

- **Installation**: Add Maven dependency in `pom.xml`

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-sql-avro-confluent-registry</artifactId>

16

<version>2.1.0</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;

24

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;

25

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;

26

```

27

28

For Debezium support:

29

30

```java

31

import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory;

32

```

33

34

## Basic Usage

35

36

### SQL Table Definition with Confluent Schema Registry

37

38

```sql

39

CREATE TABLE kafka_source (

40

user_id BIGINT,

41

user_name STRING,

42

email STRING,

43

ts TIMESTAMP(3)

44

) WITH (

45

'connector' = 'kafka',

46

'topic' = 'user-topic',

47

'properties.bootstrap.servers' = 'localhost:9092',

48

'format' = 'avro-confluent',

49

'avro-confluent.url' = 'http://localhost:8081',

50

'avro-confluent.subject' = 'user-topic-value'

51

);

52

```

53

54

### Programmatic Deserialization

55

56

```java

57

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;

58

import org.apache.avro.Schema;

59

import org.apache.avro.generic.GenericRecord;

60

61

// Create schema

62

String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";

63

Schema schema = new Schema.Parser().parse(schemaString);

64

65

// Create deserializer

66

ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer =

67

ConfluentRegistryAvroDeserializationSchema.forGeneric(

68

schema,

69

"http://localhost:8081"

70

);

71

```

72

73

### Programmatic Serialization

74

75

```java

76

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;

77

import org.apache.avro.generic.GenericRecord;

78

79

// Create serializer

80

ConfluentRegistryAvroSerializationSchema<GenericRecord> serializer =

81

ConfluentRegistryAvroSerializationSchema.forGeneric(

82

"user-topic-value",

83

schema,

84

"http://localhost:8081"

85

);

86

```

87

88

## Architecture

89

90

The library is built around several key components:

91

92

- **Format Factories**: SPI-based factories (`RegistryAvroFormatFactory`, `DebeziumAvroFormatFactory`) that integrate with Flink's table API

93

- **Serialization Schemas**: Type-safe serialization and deserialization classes supporting both generic and specific Avro records

94

- **Schema Registry Integration**: Confluent Schema Registry client integration with authentication and SSL support

95

- **Shaded Dependencies**: All external dependencies (Kafka client, Confluent client, Avro) are shaded to prevent conflicts

96

97

## Capabilities

98

99

### Standard Avro Format

100

101

Core Avro serialization and deserialization with Confluent Schema Registry integration. Supports both generic records and generated specific record classes.

102

103

```java { .api }

104

// Format identifier for SQL DDL

105

String IDENTIFIER = "avro-confluent";

106

107

// Generic record deserialization

108

ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(

109

Schema schema,

110

String url

111

);

112

113

// Specific record deserialization

114

<T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(

115

Class<T> tClass,

116

String url

117

);

118

119

// Generic record serialization

120

ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(

121

String subject,

122

Schema schema,

123

String schemaRegistryUrl

124

);

125

126

// Specific record serialization

127

<T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(

128

Class<T> tClass,

129

String subject,

130

String schemaRegistryUrl

131

);

132

```

133

134

[Standard Avro Format](./avro-format.md)

135

136

### Debezium Change Data Capture

137

138

Debezium Avro format support for change data capture scenarios, handling INSERT, UPDATE, and DELETE operations with before/after record states.

139

140

```java { .api }

141

// Format identifier for SQL DDL

142

String IDENTIFIER = "debezium-avro-confluent";

143

144

// Format factory for Debezium CDC support

145

DebeziumAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory;

146

```

147

148

[Debezium Format](./debezium-format.md)

149

150

### Configuration Options

151

152

Comprehensive configuration options for Schema Registry connection, SSL, authentication, and schema management.

153

154

```java { .api }

155

ConfigOption<String> URL; // Required: Schema Registry URL

156

ConfigOption<String> SUBJECT; // Schema Registry subject name

157

ConfigOption<String> SCHEMA; // Optional: Explicit schema string

158

ConfigOption<Map<String, String>> PROPERTIES; // Additional properties

159

```

160

161

[Configuration](./configuration.md)

162

163

## Types

164

165

```java { .api }

166

// Core configuration options class

167

@PublicEvolving

168

class AvroConfluentFormatOptions {

169

ConfigOption<String> URL;

170

ConfigOption<String> SUBJECT;

171

ConfigOption<String> SCHEMA;

172

ConfigOption<String> SSL_KEYSTORE_LOCATION;

173

ConfigOption<String> SSL_KEYSTORE_PASSWORD;

174

ConfigOption<String> SSL_TRUSTSTORE_LOCATION;

175

ConfigOption<String> SSL_TRUSTSTORE_PASSWORD;

176

ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE;

177

ConfigOption<String> BASIC_AUTH_USER_INFO;

178

ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE;

179

ConfigOption<String> BEARER_AUTH_TOKEN;

180

ConfigOption<Map<String, String>> PROPERTIES;

181

}

182

183

// Schema coder provider with caching support

184

class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {

185

CachedSchemaCoderProvider(String subject, String url, int identityMapCapacity, Map<String, ?> configs);

186

SchemaCoder get();

187

}

188

189

// Schema coder for Confluent wire protocol

190

class ConfluentSchemaRegistryCoder implements SchemaCoder {

191

ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient client);

192

ConfluentSchemaRegistryCoder(SchemaRegistryClient client);

193

Schema readSchema(InputStream in) throws IOException;

194

void writeSchema(Schema schema, OutputStream out) throws IOException;

195

}

196

```