CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-gateway

A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.

Pending
Overview
Eval results
Files

configuration-options.mddocs/

Configuration Options

Comprehensive configuration system for SQL Gateway service behavior, session management, worker threads, REST endpoints, and workflow scheduling with extensive customization options.

Capabilities

SqlGatewayServiceConfigOptions

Core service configuration options for session management, threading, and performance tuning.

/**
 * Configuration options for SQL Gateway service
 */
public class SqlGatewayServiceConfigOptions {
    
    /** Session idle timeout before automatic cleanup (default: 10 minutes) */
    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
    
    /** Interval for checking idle sessions (default: 1 minute) */
    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL;
    
    /** Maximum number of concurrent sessions (default: 1,000,000) */
    public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM;
    
    /** Enable plan caching for improved performance (default: false) */
    public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;
    
    /** Maximum entries in plan cache (default: 100) */
    public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE;
    
    /** Time-to-live for cached plans (default: 1 hour) */
    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_PLAN_CACHE_TTL;
    
    /** Maximum worker threads for operation execution (default: 500) */
    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX;
    
    /** Minimum worker threads maintained (default: 5) */
    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MIN;
    
    /** Thread keepalive time when idle (default: 5 minutes) */
    public static final ConfigOption<Duration> SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
}

SqlGatewayRestOptions

REST endpoint configuration for network settings and server behavior.

/**
 * Options to configure SqlGatewayRestEndpoint
 * By default, the user must select a local address to set ADDRESS, then the server will bind the
 * address to all the local IPV4 address (0.0.0.0) and bind the port to BIND_PORT(fallback to the
 * default value of PORT).
 */
public class SqlGatewayRestOptions {
    
    /** The address that should be used by clients to connect to the sql gateway server */
    public static final ConfigOption<String> ADDRESS;
    
    /** The address that the sql gateway server binds itself to */
    public static final ConfigOption<String> BIND_ADDRESS;
    
    /** The port range that the sql gateway server could bind itself to (default: "8083") */
    public static final ConfigOption<String> BIND_PORT;
    
    /** The port that the client connects to (default: 8083) */
    public static final ConfigOption<Integer> PORT;
}

Usage Examples

Basic Service Configuration

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;

// Configure SQL Gateway service
Configuration config = new Configuration();

// Session management
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT, 
    Duration.ofMinutes(30)); // 30 minute timeout
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 10000);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL, 
    Duration.ofSeconds(30));

// Performance tuning
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, true);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE, 500);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_TTL, 
    Duration.ofHours(2));

// Worker thread configuration
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 200);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN, 10);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME, 
    Duration.ofMinutes(10));

// Use configuration with SQL Gateway
DefaultContext context = DefaultContext.load(config, Collections.emptyList(), true);
SessionManager sessionManager = SessionManager.create(context);
SqlGateway gateway = new SqlGateway(config, sessionManager);

REST Endpoint Configuration

import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;

// Configure REST endpoint
Configuration restConfig = new Configuration();

// Basic network settings
restConfig.set(SqlGatewayRestOptions.BIND_ADDRESS, "0.0.0.0"); // Bind to all interfaces
restConfig.set(SqlGatewayRestOptions.PORT, 9083); // Custom port
restConfig.set(SqlGatewayRestOptions.REQUEST_TIMEOUT, Duration.ofMinutes(5));

// Performance settings
restConfig.set(SqlGatewayRestOptions.MAX_REQUEST_SIZE, MemorySize.ofMebiBytes(64)); // 64MB max

// SSL configuration for production
restConfig.set(SqlGatewayRestOptions.SSL_ENABLED, true);
restConfig.set(SqlGatewayRestOptions.SSL_KEYSTORE_PATH, "/path/to/keystore.jks");
restConfig.set(SqlGatewayRestOptions.SSL_KEYSTORE_PASSWORD, "keystorePassword");
restConfig.set(SqlGatewayRestOptions.SSL_TRUSTSTORE_PATH, "/path/to/truststore.jks");
restConfig.set(SqlGatewayRestOptions.SSL_TRUSTSTORE_PASSWORD, "truststorePassword");

// Create SQL Gateway with REST configuration
SqlGateway gateway = new SqlGateway(restConfig, sessionManager);
gateway.start(); // Will start REST endpoint with custom configuration

Production Configuration Example

// Production-ready configuration
public class ProductionGatewayConfig {
    
    public static Configuration createProductionConfig() {
        Configuration config = new Configuration();
        
        // Session management for high-load scenarios
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT, 
            Duration.ofHours(4)); // Longer timeout for batch jobs
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 50000);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL, 
            Duration.ofMinutes(2));
        
        // Enable plan caching for performance
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, true);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE, 2000);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_TTL, 
            Duration.ofHours(8));
        
        // High-performance thread pool
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 1000);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN, 50);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME, 
            Duration.ofMinutes(15));
        
        // REST endpoint with SSL
        config.set(SqlGatewayRestOptions.BIND_ADDRESS, "0.0.0.0");
        config.set(SqlGatewayRestOptions.PORT, 8083);
        config.set(SqlGatewayRestOptions.REQUEST_TIMEOUT, Duration.ofMinutes(10));
        config.set(SqlGatewayRestOptions.MAX_REQUEST_SIZE, MemorySize.ofMebiBytes(128));
        config.set(SqlGatewayRestOptions.SSL_ENABLED, true);
        
        // Additional Flink configuration
        config.setString("execution.target", "yarn-per-job");
        config.setString("execution.savepoint.path", "hdfs://cluster/savepoints");
        config.setInteger("parallelism.default", 8);
        config.setString("execution.checkpointing.interval", "60s");
        
        return config;
    }
}

Configuration via Properties File

// Load configuration from properties file
public class ConfigurationLoader {
    
    public static Configuration loadFromProperties(String propertiesFile) throws IOException {
        Properties props = new Properties();
        try (InputStream input = new FileInputStream(propertiesFile)) {
            props.load(input);
        }
        
        Configuration config = Configuration.fromMap(
            props.entrySet().stream()
                .collect(Collectors.toMap(
                    e -> e.getKey().toString(),
                    e -> e.getValue().toString()
                ))
        );
        
        return config;
    }
}

// Example properties file (gateway.properties):
/*
sql-gateway.session.idle-timeout=30min
sql-gateway.session.check-interval=1min  
sql-gateway.session.max-num=10000
sql-gateway.session.plan-cache.enabled=true
sql-gateway.session.plan-cache.size=500
sql-gateway.worker.threads.max=200
sql-gateway.worker.threads.min=10
rest.port=8083
rest.bind-address=0.0.0.0
rest.ssl.enabled=false
*/

Environment-Specific Configuration

// Environment-specific configuration patterns
public class EnvironmentConfigurations {
    
    public static Configuration forDevelopment() {
        Configuration config = new Configuration();
        
        // Development - minimal resources, fast feedback
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT, Duration.ofMinutes(5));
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 100);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 20);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, false);
        
        config.set(SqlGatewayRestOptions.PORT, 8083);
        config.set(SqlGatewayRestOptions.SSL_ENABLED, false);
        
        return config;
    }
    
    public static Configuration forTesting() {
        Configuration config = new Configuration();
        
        // Testing - isolated, predictable
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT, Duration.ofMinutes(1));
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 10);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 5);
        config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, false);
        
        config.set(SqlGatewayRestOptions.PORT, 0); // Random port
        config.set(SqlGatewayRestOptions.SSL_ENABLED, false);
        
        return config;
    }
    
    public static Configuration forProduction() {
        // Use production config from previous example
        return ProductionGatewayConfig.createProductionConfig();
    }
}

Dynamic Configuration Updates

// Configuration monitoring and updates (conceptual)
public class DynamicConfigurationManager {
    private final SqlGateway gateway;
    private volatile Configuration currentConfig;
    
    public DynamicConfigurationManager(SqlGateway gateway, Configuration initialConfig) {
        this.gateway = gateway;
        this.currentConfig = initialConfig;
    }
    
    public synchronized void updateConfiguration(Map<String, String> updates) {
        Configuration newConfig = currentConfig.clone();
        
        for (Map.Entry<String, String> entry : updates.entrySet()) {
            newConfig.setString(entry.getKey(), entry.getValue());
        }
        
        // Apply configuration changes that don't require restart
        applyRuntimeConfigChanges(newConfig);
        
        this.currentConfig = newConfig;
    }
    
    private void applyRuntimeConfigChanges(Configuration newConfig) {
        // Update session timeout
        Duration newTimeout = newConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT);
        // Apply to session manager...
        
        // Update thread pool sizes
        int maxThreads = newConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX);
        // Apply to thread pool...
        
        // Some changes may require restart - log warnings
        if (configRequiresRestart(currentConfig, newConfig)) {
            System.out.println("Warning: Some configuration changes require gateway restart");
        }
    }
    
    private boolean configRequiresRestart(Configuration old, Configuration updated) {
        // Check if port, SSL settings, or other restart-required settings changed
        return !Objects.equals(old.get(SqlGatewayRestOptions.PORT), updated.get(SqlGatewayRestOptions.PORT)) ||
               !Objects.equals(old.get(SqlGatewayRestOptions.SSL_ENABLED), updated.get(SqlGatewayRestOptions.SSL_ENABLED));
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway

docs

configuration-options.md

core-service-interface.md

endpoint-framework.md

index.md

operation-management.md

rest-implementation.md

result-data-models.md

session-management.md

workflow-management.md

tile.json