Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration
—
Comprehensive configuration options for Confluent Schema Registry connection, SSL settings, authentication, and schema management.
Essential configuration options for Schema Registry connectivity and schema management.
/**
* Schema Registry URL (Required)
* The URL of the Confluent Schema Registry to fetch/register schemas
*/
ConfigOption<String> URL = ConfigOptions.key("url")
.stringType()
.noDefaultValue()
.withFallbackKeys("schema-registry.url");
/**
* Schema Registry Subject
* Subject under which to register schemas during serialization
* Required for serialization, optional for deserialization
*/
ConfigOption<String> SUBJECT = ConfigOptions.key("subject")
.stringType()
.noDefaultValue()
.withFallbackKeys("schema-registry.subject");
/**
* Explicit Schema String
* Schema registered or to be registered in Schema Registry
* If not provided, Flink converts table schema to Avro schema
*/
ConfigOption<String> SCHEMA = ConfigOptions.key("schema")
.stringType()
.noDefaultValue()
.withFallbackKeys("schema-registry.schema");SSL/TLS configuration for secure connections to Schema Registry.
/**
* SSL Keystore Location
* Path to SSL keystore file for client authentication
*/
ConfigOption<String> SSL_KEYSTORE_LOCATION = ConfigOptions.key("ssl.keystore.location")
.stringType()
.noDefaultValue();
/**
* SSL Keystore Password
* Password for SSL keystore
*/
ConfigOption<String> SSL_KEYSTORE_PASSWORD = ConfigOptions.key("ssl.keystore.password")
.stringType()
.noDefaultValue();
/**
* SSL Truststore Location
* Path to SSL truststore file for server certificate validation
*/
ConfigOption<String> SSL_TRUSTSTORE_LOCATION = ConfigOptions.key("ssl.truststore.location")
.stringType()
.noDefaultValue();
/**
* SSL Truststore Password
* Password for SSL truststore
*/
ConfigOption<String> SSL_TRUSTSTORE_PASSWORD = ConfigOptions.key("ssl.truststore.password")
.stringType()
.noDefaultValue();HTTP Basic authentication configuration for Schema Registry access.
/**
* Basic Auth Credentials Source
* Source for basic authentication credentials
*/
ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("basic-auth.credentials-source")
.stringType()
.noDefaultValue();
/**
* Basic Auth User Info
* User info for basic authentication (username:password format)
*/
ConfigOption<String> BASIC_AUTH_USER_INFO = ConfigOptions.key("basic-auth.user-info")
.stringType()
.noDefaultValue();Bearer token authentication configuration for Schema Registry access.
/**
* Bearer Auth Credentials Source
* Source for bearer token credentials
*/
ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("bearer-auth.credentials-source")
.stringType()
.noDefaultValue();
/**
* Bearer Auth Token
* Bearer token for authentication
*/
ConfigOption<String> BEARER_AUTH_TOKEN = ConfigOptions.key("bearer-auth.token")
.stringType()
.noDefaultValue();Additional properties for fine-tuned Schema Registry client configuration.
/**
* Additional Properties Map
* Properties forwarded to underlying Schema Registry client
* Useful for options not officially exposed via Flink config
* Note: Flink options have higher precedence
*/
ConfigOption<Map<String, String>> PROPERTIES = ConfigOptions.key("properties")
.mapType()
.noDefaultValue();CREATE TABLE user_events (
user_id BIGINT,
event_name STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://localhost:8081'
);CREATE TABLE secure_events (
id BIGINT,
data STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'secure-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'https://schema-registry.example.com:8081',
'avro-confluent.ssl.keystore.location' = '/path/to/client.keystore.jks',
'avro-confluent.ssl.keystore.password' = 'keystorepass',
'avro-confluent.ssl.truststore.location' = '/path/to/client.truststore.jks',
'avro-confluent.ssl.truststore.password' = 'truststorepass'
);CREATE TABLE authenticated_events (
id BIGINT,
message STRING,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'auth-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry.example.com:8081',
'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'avro-confluent.basic-auth.user-info' = 'username:password'
);CREATE TABLE token_events (
id BIGINT,
payload STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'token-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry.example.com:8081',
'avro-confluent.bearer-auth.credentials-source' = 'STATIC_TOKEN',
'avro-confluent.bearer-auth.token' = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
);CREATE TABLE typed_events (
user_id BIGINT,
event_type STRING,
properties MAP<STRING, STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'typed-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://localhost:8081',
'avro-confluent.subject' = 'typed-events-value',
'avro-confluent.schema' = '{
"type": "record",
"name": "TypedEvent",
"fields": [
{"name": "user_id", "type": "long"},
{"name": "event_type", "type": "string"},
{"name": "properties", "type": {"type": "map", "values": "string"}}
]
}'
);CREATE TABLE advanced_events (
id BIGINT,
data STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'advanced-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://localhost:8081',
'avro-confluent.properties.schema.registry.request.timeout.ms' = '10000',
'avro-confluent.properties.schema.registry.connection.timeout.ms' = '5000',
'avro-confluent.properties.schema.registry.retry.backoff.ms' = '1000'
);import org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory;
import org.apache.flink.configuration.Configuration;
// Build configuration programmatically
Configuration config = new Configuration();
config.setString(AvroConfluentFormatOptions.URL, "https://schema-registry.example.com:8081");
config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION, "/path/to/keystore.jks");
config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD, "password");
// Convert to registry properties map
Map<String, String> registryConfigs = RegistryAvroFormatFactory.buildOptionalPropertiesMap(config);The configuration options are translated to Schema Registry client properties:
// SSL Configuration Mapping
"schema.registry.ssl.keystore.location" -> SSL_KEYSTORE_LOCATION value
"schema.registry.ssl.keystore.password" -> SSL_KEYSTORE_PASSWORD value
"schema.registry.ssl.truststore.location" -> SSL_TRUSTSTORE_LOCATION value
"schema.registry.ssl.truststore.password" -> SSL_TRUSTSTORE_PASSWORD value
// Authentication Configuration Mapping
"basic.auth.credentials.source" -> BASIC_AUTH_CREDENTIALS_SOURCE value
"basic.auth.user.info" -> BASIC_AUTH_USER_INFO value
"bearer.auth.credentials.source" -> BEARER_AUTH_CREDENTIALS_SOURCE value
"bearer.auth.token" -> BEARER_AUTH_TOKEN valueConfiguration options are resolved in the following priority order:
PROPERTIES optionExample: If both avro-confluent.ssl.keystore.location and avro-confluent.properties.schema.registry.ssl.keystore.location are specified, the direct option takes precedence.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry