CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry

Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration Options

Comprehensive configuration options for Confluent Schema Registry connection, SSL settings, authentication, and schema management.

Capabilities

Core Configuration Options

Essential configuration options for Schema Registry connectivity and schema management.

/**
 * Schema Registry URL (Required)
 * The URL of the Confluent Schema Registry to fetch/register schemas
 */
ConfigOption<String> URL = ConfigOptions.key("url")
    .stringType()
    .noDefaultValue()
    .withFallbackKeys("schema-registry.url");

/**
 * Schema Registry Subject  
 * Subject under which to register schemas during serialization
 * Required for serialization, optional for deserialization
 */
ConfigOption<String> SUBJECT = ConfigOptions.key("subject")
    .stringType()
    .noDefaultValue()
    .withFallbackKeys("schema-registry.subject");

/**
 * Explicit Schema String
 * Schema registered or to be registered in Schema Registry
 * If not provided, Flink converts table schema to Avro schema
 */
ConfigOption<String> SCHEMA = ConfigOptions.key("schema")
    .stringType()
    .noDefaultValue()
    .withFallbackKeys("schema-registry.schema");

SSL Configuration Options

SSL/TLS configuration for secure connections to Schema Registry.

/**
 * SSL Keystore Location
 * Path to SSL keystore file for client authentication
 */
ConfigOption<String> SSL_KEYSTORE_LOCATION = ConfigOptions.key("ssl.keystore.location")
    .stringType()
    .noDefaultValue();

/**
 * SSL Keystore Password
 * Password for SSL keystore
 */
ConfigOption<String> SSL_KEYSTORE_PASSWORD = ConfigOptions.key("ssl.keystore.password")
    .stringType()
    .noDefaultValue();

/**
 * SSL Truststore Location  
 * Path to SSL truststore file for server certificate validation
 */
ConfigOption<String> SSL_TRUSTSTORE_LOCATION = ConfigOptions.key("ssl.truststore.location")
    .stringType()
    .noDefaultValue();

/**
 * SSL Truststore Password
 * Password for SSL truststore
 */
ConfigOption<String> SSL_TRUSTSTORE_PASSWORD = ConfigOptions.key("ssl.truststore.password")
    .stringType()
    .noDefaultValue();

Basic Authentication Options

HTTP Basic authentication configuration for Schema Registry access.

/**
 * Basic Auth Credentials Source
 * Source for basic authentication credentials
 */
ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("basic-auth.credentials-source")
    .stringType()
    .noDefaultValue();

/**
 * Basic Auth User Info
 * User info for basic authentication (username:password format)
 */
ConfigOption<String> BASIC_AUTH_USER_INFO = ConfigOptions.key("basic-auth.user-info")
    .stringType()
    .noDefaultValue();

Bearer Token Authentication Options

Bearer token authentication configuration for Schema Registry access.

/**
 * Bearer Auth Credentials Source
 * Source for bearer token credentials
 */
ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE = ConfigOptions.key("bearer-auth.credentials-source")
    .stringType()
    .noDefaultValue();

/**
 * Bearer Auth Token
 * Bearer token for authentication
 */
ConfigOption<String> BEARER_AUTH_TOKEN = ConfigOptions.key("bearer-auth.token")
    .stringType()
    .noDefaultValue();

Advanced Configuration Options

Additional properties for fine-tuned Schema Registry client configuration.

/**
 * Additional Properties Map
 * Properties forwarded to underlying Schema Registry client
 * Useful for options not officially exposed via Flink config
 * Note: Flink options have higher precedence
 */
ConfigOption<Map<String, String>> PROPERTIES = ConfigOptions.key("properties")
    .mapType()
    .noDefaultValue();

SQL Configuration Examples

Basic Configuration

CREATE TABLE user_events (
  user_id BIGINT,
  event_name STRING,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'user-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://localhost:8081'
);

SSL Configuration

CREATE TABLE secure_events (
  id BIGINT,
  data STRING,
  timestamp_col TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'secure-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'https://schema-registry.example.com:8081',
  'avro-confluent.ssl.keystore.location' = '/path/to/client.keystore.jks',
  'avro-confluent.ssl.keystore.password' = 'keystorepass',
  'avro-confluent.ssl.truststore.location' = '/path/to/client.truststore.jks',
  'avro-confluent.ssl.truststore.password' = 'truststorepass'
);

Basic Authentication

CREATE TABLE authenticated_events (
  id BIGINT,
  message STRING,
  created_at TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'auth-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://schema-registry.example.com:8081',
  'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
  'avro-confluent.basic-auth.user-info' = 'username:password'
);

Bearer Token Authentication

CREATE TABLE token_events (
  id BIGINT,
  payload STRING,
  timestamp_col TIMESTAMP(3)  
) WITH (
  'connector' = 'kafka',
  'topic' = 'token-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://schema-registry.example.com:8081',
  'avro-confluent.bearer-auth.credentials-source' = 'STATIC_TOKEN',
  'avro-confluent.bearer-auth.token' = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
);

Explicit Schema Configuration

CREATE TABLE typed_events (
  user_id BIGINT,
  event_type STRING,
  properties MAP<STRING, STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'typed-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://localhost:8081',
  'avro-confluent.subject' = 'typed-events-value',
  'avro-confluent.schema' = '{
    "type": "record",
    "name": "TypedEvent", 
    "fields": [
      {"name": "user_id", "type": "long"},
      {"name": "event_type", "type": "string"},
      {"name": "properties", "type": {"type": "map", "values": "string"}}
    ]
  }'
);

Advanced Properties Configuration

CREATE TABLE advanced_events (
  id BIGINT,
  data STRING,
  timestamp_col TIMESTAMP(3)
) WITH (
  'connector' = 'kafka', 
  'topic' = 'advanced-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://localhost:8081',
  'avro-confluent.properties.schema.registry.request.timeout.ms' = '10000',
  'avro-confluent.properties.schema.registry.connection.timeout.ms' = '5000',
  'avro-confluent.properties.schema.registry.retry.backoff.ms' = '1000'
);

Programmatic Configuration

Registry Configuration Map Building

import org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory;
import org.apache.flink.configuration.Configuration;

// Build configuration programmatically
Configuration config = new Configuration();
config.setString(AvroConfluentFormatOptions.URL, "https://schema-registry.example.com:8081");
config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION, "/path/to/keystore.jks");
config.setString(AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD, "password");

// Convert to registry properties map
Map<String, String> registryConfigs = RegistryAvroFormatFactory.buildOptionalPropertiesMap(config);

Schema Registry Client Configuration

The configuration options are translated to Schema Registry client properties:

// SSL Configuration Mapping
"schema.registry.ssl.keystore.location" -> SSL_KEYSTORE_LOCATION value
"schema.registry.ssl.keystore.password" -> SSL_KEYSTORE_PASSWORD value  
"schema.registry.ssl.truststore.location" -> SSL_TRUSTSTORE_LOCATION value
"schema.registry.ssl.truststore.password" -> SSL_TRUSTSTORE_PASSWORD value

// Authentication Configuration Mapping  
"basic.auth.credentials.source" -> BASIC_AUTH_CREDENTIALS_SOURCE value
"basic.auth.user.info" -> BASIC_AUTH_USER_INFO value
"bearer.auth.credentials.source" -> BEARER_AUTH_CREDENTIALS_SOURCE value
"bearer.auth.token" -> BEARER_AUTH_TOKEN value

Configuration Priority

Configuration options are resolved in the following priority order:

  1. Direct Flink config options (highest priority)
  2. Properties map entries via PROPERTIES option
  3. Default values (if specified)

Example: If both avro-confluent.ssl.keystore.location and avro-confluent.properties.schema.registry.ssl.keystore.location are specified, the direct option takes precedence.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry

docs

avro-format.md

configuration.md

debezium-format.md

index.md

tile.json