or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-format.mdconfiguration.mddebezium-format.mdindex.md

configuration.mddocs/

0

# Configuration Options

1

2

Comprehensive configuration options for Confluent Schema Registry connection, SSL settings, authentication, and schema management.

3

4

## Capabilities

5

6

### Core Configuration Options

7

8

Essential configuration options for Schema Registry connectivity and schema management.

9

10

```java { .api }

11

/**

12

* Schema Registry URL (Required)

13

* The URL of the Confluent Schema Registry to fetch/register schemas

14

*/

15

ConfigOption<String> URL = ConfigOptions.key("url")

16

.stringType()

17

.noDefaultValue()

18

.withFallbackKeys("schema-registry.url");

19

20

/**

21

* Schema Registry Subject

22

* Subject under which to register schemas during serialization

23

* Required for serialization, optional for deserialization

24

*/

25

ConfigOption<String> SUBJECT = ConfigOptions.key("subject")

26

.stringType()

27

.noDefaultValue()

28

.withFallbackKeys("schema-registry.subject");

29

30

/**

31

* Explicit Schema String

32

* Schema registered or to be registered in Schema Registry

33

* If not provided, Flink converts table schema to Avro schema

34

*/

35

ConfigOption<String> SCHEMA = ConfigOptions.key("schema")

36

.stringType()

37

.noDefaultValue()

38

.withFallbackKeys("schema-registry.schema");

39

```

40

41

### SSL Configuration Options

42

43

SSL/TLS configuration for secure connections to Schema Registry.

44

45

```java { .api }

46

/**

47

* SSL Keystore Location

48

* Path to SSL keystore file for client authentication

49

*/

50

ConfigOption<String> SSL_KEYSTORE_LOCATION = ConfigOptions.key("ssl.keystore.location")

51

.stringType()

52

.noDefaultValue();

53

54

/**

55

* SSL Keystore Password

56

* Password for SSL keystore

57

*/

58

ConfigOption<String> SSL_KEYSTORE_PASSWORD = ConfigOptions.key("ssl.keystore.password")

59

.stringType()

60

.noDefaultValue();

61

62

/**

63

* SSL Truststore Location

64

* Path to SSL truststore file for server certificate validation

65

*/

66

ConfigOption<String> SSL_TRUSTSTORE_LOCATION = ConfigOptions.key("ssl.truststore.location")

67

.stringType()

68

.noDefaultValue();

69

70

/**

71

* SSL Truststore Password

72

* Password for SSL truststore

73

*/

74

ConfigOption<String> SSL_TRUSTSTORE_PASSWORD = ConfigOptions.key("ssl.truststore.password")

75

.stringType()

76

.noDefaultValue();

77

```

78

79

### Basic Authentication Options

80

81

HTTP Basic authentication configuration for Schema Registry access.

82

83

```java { .api }

84

/**

85

* Basic Auth Credentials Source

86

* Source for basic authentication credentials

87

*/

88

ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("basic-auth.credentials-source")

89

.stringType()

90

.noDefaultValue();

91

92

/**

93

* Basic Auth User Info

94

* User info for basic authentication (username:password format)

95

*/

96

ConfigOption<String> BASIC_AUTH_USER_INFO = ConfigOptions.key("basic-auth.user-info")

97

.stringType()

98

.noDefaultValue();

99

```

100

101

### Bearer Token Authentication Options

102

103

Bearer token authentication configuration for Schema Registry access.

104

105

```java { .api }

106

/**

107

* Bearer Auth Credentials Source

108

* Source for bearer token credentials

109

*/

110

ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("bearer-auth.credentials-source")

111

.stringType()

112

.noDefaultValue();

113

114

/**

115

* Bearer Auth Token

116

* Bearer token for authentication

117

*/

118

ConfigOption<String> BEARER_AUTH_TOKEN = ConfigOptions.key("bearer-auth.token")

119

.stringType()

120

.noDefaultValue();

121

```

122

123

### Advanced Configuration Options

124

125

Additional properties for fine-tuned Schema Registry client configuration.

126

127

```java { .api }

128

/**

129

* Additional Properties Map

130

* Properties forwarded to underlying Schema Registry client

131

* Useful for options not officially exposed via Flink config

132

* Note: Flink options have higher precedence

133

*/

134

ConfigOption<Map<String, String>> PROPERTIES = ConfigOptions.key("properties")

135

.mapType()

136

.noDefaultValue();

137

```

138

139

## SQL Configuration Examples

140

141

### Basic Configuration

142

143

```sql

144

CREATE TABLE user_events (

145

user_id BIGINT,

146

event_name STRING,

147

event_time TIMESTAMP(3)

148

) WITH (

149

'connector' = 'kafka',

150

'topic' = 'user-events',

151

'properties.bootstrap.servers' = 'localhost:9092',

152

'format' = 'avro-confluent',

153

'avro-confluent.url' = 'http://localhost:8081'

154

);

155

```

156

157

### SSL Configuration

158

159

```sql

160

CREATE TABLE secure_events (

161

id BIGINT,

162

data STRING,

163

timestamp_col TIMESTAMP(3)

164

) WITH (

165

'connector' = 'kafka',

166

'topic' = 'secure-events',

167

'properties.bootstrap.servers' = 'localhost:9092',

168

'format' = 'avro-confluent',

169

'avro-confluent.url' = 'https://schema-registry.example.com:8081',

170

'avro-confluent.ssl.keystore.location' = '/path/to/client.keystore.jks',

171

'avro-confluent.ssl.keystore.password' = 'keystorepass',

172

'avro-confluent.ssl.truststore.location' = '/path/to/client.truststore.jks',

173

'avro-confluent.ssl.truststore.password' = 'truststorepass'

174

);

175

```

176

177

### Basic Authentication

178

179

```sql

180

CREATE TABLE authenticated_events (

181

id BIGINT,

182

message STRING,

183

created_at TIMESTAMP(3)

184

) WITH (

185

'connector' = 'kafka',

186

'topic' = 'auth-events',

187

'properties.bootstrap.servers' = 'localhost:9092',

188

'format' = 'avro-confluent',

189

'avro-confluent.url' = 'http://schema-registry.example.com:8081',

190

'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',

191

'avro-confluent.basic-auth.user-info' = 'username:password'

192

);

193

```

194

195

### Bearer Token Authentication

196

197

```sql

198

CREATE TABLE token_events (

199

id BIGINT,

200

payload STRING,

201

timestamp_col TIMESTAMP(3)

202

) WITH (

203

'connector' = 'kafka',

204

'topic' = 'token-events',

205

'properties.bootstrap.servers' = 'localhost:9092',

206

'format' = 'avro-confluent',

207

'avro-confluent.url' = 'http://schema-registry.example.com:8081',

208

'avro-confluent.bearer-auth.credentials-source' = 'STATIC_TOKEN',

209

'avro-confluent.bearer-auth.token' = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'

210

);

211

```

212

213

### Explicit Schema Configuration

214

215

```sql

216

CREATE TABLE typed_events (

217

user_id BIGINT,

218

event_type STRING,

219

properties MAP<STRING, STRING>

220

) WITH (

221

'connector' = 'kafka',

222

'topic' = 'typed-events',

223

'properties.bootstrap.servers' = 'localhost:9092',

224

'format' = 'avro-confluent',

225

'avro-confluent.url' = 'http://localhost:8081',

226

'avro-confluent.subject' = 'typed-events-value',

227

'avro-confluent.schema' = '{

228

"type": "record",

229

"name": "TypedEvent",

230

"fields": [

231

{"name": "user_id", "type": "long"},

232

{"name": "event_type", "type": "string"},

233

{"name": "properties", "type": {"type": "map", "values": "string"}}

234

]

235

}'

236

);

237

```

238

239

### Advanced Properties Configuration

240

241

```sql

242

CREATE TABLE advanced_events (

243

id BIGINT,

244

data STRING,

245

timestamp_col TIMESTAMP(3)

246

) WITH (

247

'connector' = 'kafka',

248

'topic' = 'advanced-events',

249

'properties.bootstrap.servers' = 'localhost:9092',

250

'format' = 'avro-confluent',

251

'avro-confluent.url' = 'http://localhost:8081',

252

'avro-confluent.properties.schema.registry.request.timeout.ms' = '10000',

253

'avro-confluent.properties.schema.registry.connection.timeout.ms' = '5000',

254

'avro-confluent.properties.schema.registry.retry.backoff.ms' = '1000'

255

);

256

```

257

258

## Programmatic Configuration

259

260

### Registry Configuration Map Building

261

262

```java

263

import org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory;

264

import org.apache.flink.configuration.Configuration;

265

266

// Build configuration programmatically

267

Configuration config = new Configuration();

268

config.setString(AvroConfluentFormatOptions.URL, "https://schema-registry.example.com:8081");

269

config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION, "/path/to/keystore.jks");

270

config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD, "password");

271

272

// Convert to registry properties map

273

Map<String, String> registryConfigs = RegistryAvroFormatFactory.buildOptionalPropertiesMap(config);

274

```

275

276

### Schema Registry Client Configuration

277

278

The configuration options are translated to Schema Registry client properties:

279

280

```java

281

// SSL Configuration Mapping

282

"schema.registry.ssl.keystore.location" -> SSL_KEYSTORE_LOCATION value

283

"schema.registry.ssl.keystore.password" -> SSL_KEYSTORE_PASSWORD value

284

"schema.registry.ssl.truststore.location" -> SSL_TRUSTSTORE_LOCATION value

285

"schema.registry.ssl.truststore.password" -> SSL_TRUSTSTORE_PASSWORD value

286

287

// Authentication Configuration Mapping

288

"basic.auth.credentials.source" -> BASIC_AUTH_CREDENTIALS_SOURCE value

289

"basic.auth.user.info" -> BASIC_AUTH_USER_INFO value

290

"bearer.auth.credentials.source" -> BEARER_AUTH_CREDENTIALS_SOURCE value

291

"bearer.auth.token" -> BEARER_AUTH_TOKEN value

292

```

293

294

## Configuration Priority

295

296

Configuration options are resolved in the following priority order:

297

298

1. **Direct Flink config options** (highest priority)

299

2. **Properties map entries** via `PROPERTIES` option

300

3. **Default values** (if specified)

301

302

Example: If both `avro-confluent.ssl.keystore.location` and `avro-confluent.properties.schema.registry.ssl.keystore.location` are specified, the direct option takes precedence.