0
# Configuration Options
1
2
Comprehensive configuration options for Confluent Schema Registry connection, SSL settings, authentication, and schema management.
3
4
## Capabilities
5
6
### Core Configuration Options
7
8
Essential configuration options for Schema Registry connectivity and schema management.
9
10
```java { .api }
11
/**
12
* Schema Registry URL (Required)
13
* The URL of the Confluent Schema Registry to fetch/register schemas
14
*/
15
ConfigOption<String> URL = ConfigOptions.key("url")
16
.stringType()
17
.noDefaultValue()
18
.withFallbackKeys("schema-registry.url");
19
20
/**
21
* Schema Registry Subject
22
* Subject under which to register schemas during serialization
23
* Required for serialization, optional for deserialization
24
*/
25
ConfigOption<String> SUBJECT = ConfigOptions.key("subject")
26
.stringType()
27
.noDefaultValue()
28
.withFallbackKeys("schema-registry.subject");
29
30
/**
31
* Explicit Schema String
32
* Schema registered or to be registered in Schema Registry
33
* If not provided, Flink converts table schema to Avro schema
34
*/
35
ConfigOption<String> SCHEMA = ConfigOptions.key("schema")
36
.stringType()
37
.noDefaultValue()
38
.withFallbackKeys("schema-registry.schema");
39
```
40
41
### SSL Configuration Options
42
43
SSL/TLS configuration for secure connections to Schema Registry.
44
45
```java { .api }
46
/**
47
* SSL Keystore Location
48
* Path to SSL keystore file for client authentication
49
*/
50
ConfigOption<String> SSL_KEYSTORE_LOCATION = ConfigOptions.key("ssl.keystore.location")
51
.stringType()
52
.noDefaultValue();
53
54
/**
55
* SSL Keystore Password
56
* Password for SSL keystore
57
*/
58
ConfigOption<String> SSL_KEYSTORE_PASSWORD = ConfigOptions.key("ssl.keystore.password")
59
.stringType()
60
.noDefaultValue();
61
62
/**
63
* SSL Truststore Location
64
* Path to SSL truststore file for server certificate validation
65
*/
66
ConfigOption<String> SSL_TRUSTSTORE_LOCATION = ConfigOptions.key("ssl.truststore.location")
67
.stringType()
68
.noDefaultValue();
69
70
/**
71
* SSL Truststore Password
72
* Password for SSL truststore
73
*/
74
ConfigOption<String> SSL_TRUSTSTORE_PASSWORD = ConfigOptions.key("ssl.truststore.password")
75
.stringType()
76
.noDefaultValue();
77
```
78
79
### Basic Authentication Options
80
81
HTTP Basic authentication configuration for Schema Registry access.
82
83
```java { .api }
84
/**
85
* Basic Auth Credentials Source
86
* Source for basic authentication credentials
87
*/
88
ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("basic-auth.credentials-source")
89
.stringType()
90
.noDefaultValue();
91
92
/**
93
* Basic Auth User Info
94
* User info for basic authentication (username:password format)
95
*/
96
ConfigOption<String> BASIC_AUTH_USER_INFO = ConfigOptions.key("basic-auth.user-info")
97
.stringType()
98
.noDefaultValue();
99
```
100
101
### Bearer Token Authentication Options
102
103
Bearer token authentication configuration for Schema Registry access.
104
105
```java { .api }
106
/**
107
* Bearer Auth Credentials Source
108
* Source for bearer token credentials
109
*/
110
ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("bearer-auth.credentials-source")
111
.stringType()
112
.noDefaultValue();
113
114
/**
115
* Bearer Auth Token
116
* Bearer token for authentication
117
*/
118
ConfigOption<String> BEARER_AUTH_TOKEN = ConfigOptions.key("bearer-auth.token")
119
.stringType()
120
.noDefaultValue();
121
```
122
123
### Advanced Configuration Options
124
125
Additional properties for fine-tuned Schema Registry client configuration.
126
127
```java { .api }
128
/**
129
* Additional Properties Map
130
* Properties forwarded to underlying Schema Registry client
131
* Useful for options not officially exposed via Flink config
132
* Note: Flink options have higher precedence
133
*/
134
ConfigOption<Map<String, String>> PROPERTIES = ConfigOptions.key("properties")
135
.mapType()
136
.noDefaultValue();
137
```
138
139
## SQL Configuration Examples
140
141
### Basic Configuration
142
143
```sql
144
CREATE TABLE user_events (
145
user_id BIGINT,
146
event_name STRING,
147
event_time TIMESTAMP(3)
148
) WITH (
149
'connector' = 'kafka',
150
'topic' = 'user-events',
151
'properties.bootstrap.servers' = 'localhost:9092',
152
'format' = 'avro-confluent',
153
'avro-confluent.url' = 'http://localhost:8081'
154
);
155
```
156
157
### SSL Configuration
158
159
```sql
160
CREATE TABLE secure_events (
161
id BIGINT,
162
data STRING,
163
timestamp_col TIMESTAMP(3)
164
) WITH (
165
'connector' = 'kafka',
166
'topic' = 'secure-events',
167
'properties.bootstrap.servers' = 'localhost:9092',
168
'format' = 'avro-confluent',
169
'avro-confluent.url' = 'https://schema-registry.example.com:8081',
170
'avro-confluent.ssl.keystore.location' = '/path/to/client.keystore.jks',
171
'avro-confluent.ssl.keystore.password' = 'keystorepass',
172
'avro-confluent.ssl.truststore.location' = '/path/to/client.truststore.jks',
173
'avro-confluent.ssl.truststore.password' = 'truststorepass'
174
);
175
```
176
177
### Basic Authentication
178
179
```sql
180
CREATE TABLE authenticated_events (
181
id BIGINT,
182
message STRING,
183
created_at TIMESTAMP(3)
184
) WITH (
185
'connector' = 'kafka',
186
'topic' = 'auth-events',
187
'properties.bootstrap.servers' = 'localhost:9092',
188
'format' = 'avro-confluent',
189
'avro-confluent.url' = 'http://schema-registry.example.com:8081',
190
'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
191
'avro-confluent.basic-auth.user-info' = 'username:password'
192
);
193
```
194
195
### Bearer Token Authentication
196
197
```sql
198
CREATE TABLE token_events (
199
id BIGINT,
200
payload STRING,
201
timestamp_col TIMESTAMP(3)
202
) WITH (
203
'connector' = 'kafka',
204
'topic' = 'token-events',
205
'properties.bootstrap.servers' = 'localhost:9092',
206
'format' = 'avro-confluent',
207
'avro-confluent.url' = 'http://schema-registry.example.com:8081',
208
'avro-confluent.bearer-auth.credentials-source' = 'STATIC_TOKEN',
209
'avro-confluent.bearer-auth.token' = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
210
);
211
```
212
213
### Explicit Schema Configuration
214
215
```sql
216
CREATE TABLE typed_events (
217
user_id BIGINT,
218
event_type STRING,
219
properties MAP<STRING, STRING>
220
) WITH (
221
'connector' = 'kafka',
222
'topic' = 'typed-events',
223
'properties.bootstrap.servers' = 'localhost:9092',
224
'format' = 'avro-confluent',
225
'avro-confluent.url' = 'http://localhost:8081',
226
'avro-confluent.subject' = 'typed-events-value',
227
'avro-confluent.schema' = '{
228
"type": "record",
229
"name": "TypedEvent",
230
"fields": [
231
{"name": "user_id", "type": "long"},
232
{"name": "event_type", "type": "string"},
233
{"name": "properties", "type": {"type": "map", "values": "string"}}
234
]
235
}'
236
);
237
```
238
239
### Advanced Properties Configuration
240
241
```sql
242
CREATE TABLE advanced_events (
243
id BIGINT,
244
data STRING,
245
timestamp_col TIMESTAMP(3)
246
) WITH (
247
'connector' = 'kafka',
248
'topic' = 'advanced-events',
249
'properties.bootstrap.servers' = 'localhost:9092',
250
'format' = 'avro-confluent',
251
'avro-confluent.url' = 'http://localhost:8081',
252
'avro-confluent.properties.schema.registry.request.timeout.ms' = '10000',
253
'avro-confluent.properties.schema.registry.connection.timeout.ms' = '5000',
254
'avro-confluent.properties.schema.registry.retry.backoff.ms' = '1000'
255
);
256
```
257
258
## Programmatic Configuration
259
260
### Registry Configuration Map Building
261
262
```java
263
import org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory;
264
import org.apache.flink.configuration.Configuration;
265
266
// Build configuration programmatically
267
Configuration config = new Configuration();
268
config.setString(AvroConfluentFormatOptions.URL, "https://schema-registry.example.com:8081");
269
config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION, "/path/to/keystore.jks");
270
config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD, "password");
271
272
// Convert to registry properties map
273
Map<String, String> registryConfigs = RegistryAvroFormatFactory.buildOptionalPropertiesMap(config);
274
```
275
276
### Schema Registry Client Configuration
277
278
The configuration options are translated to Schema Registry client properties:
279
280
```java
281
// SSL Configuration Mapping
282
"schema.registry.ssl.keystore.location" -> SSL_KEYSTORE_LOCATION value
283
"schema.registry.ssl.keystore.password" -> SSL_KEYSTORE_PASSWORD value
284
"schema.registry.ssl.truststore.location" -> SSL_TRUSTSTORE_LOCATION value
285
"schema.registry.ssl.truststore.password" -> SSL_TRUSTSTORE_PASSWORD value
286
287
// Authentication Configuration Mapping
288
"basic.auth.credentials.source" -> BASIC_AUTH_CREDENTIALS_SOURCE value
289
"basic.auth.user.info" -> BASIC_AUTH_USER_INFO value
290
"bearer.auth.credentials.source" -> BEARER_AUTH_CREDENTIALS_SOURCE value
291
"bearer.auth.token" -> BEARER_AUTH_TOKEN value
292
```
293
294
## Configuration Priority
295
296
Configuration options are resolved in the following priority order:
297
298
1. **Direct Flink config options** (highest priority)
299
2. **Properties map entries** via `PROPERTIES` option
300
3. **Default values** (if specified)
301
302
Example: If both `avro-confluent.ssl.keystore.location` and `avro-confluent.properties.schema.registry.ssl.keystore.location` are specified, the direct option takes precedence.