A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
—
Comprehensive configuration system for SQL Gateway service behavior, session management, worker threads, REST endpoints, and workflow scheduling with extensive customization options.
Core service configuration options for session management, threading, and performance tuning.
/**
* Configuration options for SQL Gateway service
*/
public class SqlGatewayServiceConfigOptions {
/** Session idle timeout before automatic cleanup (default: 10 minutes) */
public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
/** Interval for checking idle sessions (default: 1 minute) */
public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL;
/** Maximum number of concurrent sessions (default: 1,000,000) */
public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM;
/** Enable plan caching for improved performance (default: false) */
public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;
/** Maximum entries in plan cache (default: 100) */
public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE;
/** Time-to-live for cached plans (default: 1 hour) */
public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_PLAN_CACHE_TTL;
/** Maximum worker threads for operation execution (default: 500) */
public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX;
/** Minimum worker threads maintained (default: 5) */
public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MIN;
/** Thread keepalive time when idle (default: 5 minutes) */
public static final ConfigOption<Duration> SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
}REST endpoint configuration for network settings and server behavior.
/**
* Options to configure SqlGatewayRestEndpoint
* By default, the user must select a local address to set ADDRESS, then the server will bind the
* address to all the local IPV4 address (0.0.0.0) and bind the port to BIND_PORT(fallback to the
* default value of PORT).
*/
public class SqlGatewayRestOptions {
/** The address that should be used by clients to connect to the sql gateway server */
public static final ConfigOption<String> ADDRESS;
/** The address that the sql gateway server binds itself to */
public static final ConfigOption<String> BIND_ADDRESS;
/** The port range that the sql gateway server could bind itself to (default: "8083") */
public static final ConfigOption<String> BIND_PORT;
/** The port that the client connects to (default: 8083) */
public static final ConfigOption<Integer> PORT;
}import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
// Configure SQL Gateway service
Configuration config = new Configuration();
// Session management
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT,
Duration.ofMinutes(30)); // 30 minute timeout
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 10000);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL,
Duration.ofSeconds(30));
// Performance tuning
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, true);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE, 500);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_TTL,
Duration.ofHours(2));
// Worker thread configuration
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 200);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN, 10);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME,
Duration.ofMinutes(10));
// Use configuration with SQL Gateway
DefaultContext context = DefaultContext.load(config, Collections.emptyList(), true);
SessionManager sessionManager = SessionManager.create(context);
SqlGateway gateway = new SqlGateway(config, sessionManager);import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;
// Configure REST endpoint
Configuration restConfig = new Configuration();
// Basic network settings
restConfig.set(SqlGatewayRestOptions.BIND_ADDRESS, "0.0.0.0"); // Bind to all interfaces
restConfig.set(SqlGatewayRestOptions.PORT, 9083); // Custom port
restConfig.set(SqlGatewayRestOptions.REQUEST_TIMEOUT, Duration.ofMinutes(5));
// Performance settings
restConfig.set(SqlGatewayRestOptions.MAX_REQUEST_SIZE, MemorySize.ofMebiBytes(64)); // 64MB max
// SSL configuration for production
restConfig.set(SqlGatewayRestOptions.SSL_ENABLED, true);
restConfig.set(SqlGatewayRestOptions.SSL_KEYSTORE_PATH, "/path/to/keystore.jks");
restConfig.set(SqlGatewayRestOptions.SSL_KEYSTORE_PASSWORD, "keystorePassword");
restConfig.set(SqlGatewayRestOptions.SSL_TRUSTSTORE_PATH, "/path/to/truststore.jks");
restConfig.set(SqlGatewayRestOptions.SSL_TRUSTSTORE_PASSWORD, "truststorePassword");
// Create SQL Gateway with REST configuration
SqlGateway gateway = new SqlGateway(restConfig, sessionManager);
gateway.start(); // Will start REST endpoint with custom configuration// Production-ready configuration
public class ProductionGatewayConfig {
public static Configuration createProductionConfig() {
Configuration config = new Configuration();
// Session management for high-load scenarios
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT,
Duration.ofHours(4)); // Longer timeout for batch jobs
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 50000);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL,
Duration.ofMinutes(2));
// Enable plan caching for performance
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, true);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE, 2000);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_TTL,
Duration.ofHours(8));
// High-performance thread pool
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 1000);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN, 50);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME,
Duration.ofMinutes(15));
// REST endpoint with SSL
config.set(SqlGatewayRestOptions.BIND_ADDRESS, "0.0.0.0");
config.set(SqlGatewayRestOptions.PORT, 8083);
config.set(SqlGatewayRestOptions.REQUEST_TIMEOUT, Duration.ofMinutes(10));
config.set(SqlGatewayRestOptions.MAX_REQUEST_SIZE, MemorySize.ofMebiBytes(128));
config.set(SqlGatewayRestOptions.SSL_ENABLED, true);
// Additional Flink configuration
config.setString("execution.target", "yarn-per-job");
config.setString("execution.savepoint.path", "hdfs://cluster/savepoints");
config.setInteger("parallelism.default", 8);
config.setString("execution.checkpointing.interval", "60s");
return config;
}
}// Load configuration from properties file
public class ConfigurationLoader {
public static Configuration loadFromProperties(String propertiesFile) throws IOException {
Properties props = new Properties();
try (InputStream input = new FileInputStream(propertiesFile)) {
props.load(input);
}
Configuration config = Configuration.fromMap(
props.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey().toString(),
e -> e.getValue().toString()
))
);
return config;
}
}
// Example properties file (gateway.properties):
/*
sql-gateway.session.idle-timeout=30min
sql-gateway.session.check-interval=1min
sql-gateway.session.max-num=10000
sql-gateway.session.plan-cache.enabled=true
sql-gateway.session.plan-cache.size=500
sql-gateway.worker.threads.max=200
sql-gateway.worker.threads.min=10
rest.port=8083
rest.bind-address=0.0.0.0
rest.ssl.enabled=false
*/// Environment-specific configuration patterns
public class EnvironmentConfigurations {
public static Configuration forDevelopment() {
Configuration config = new Configuration();
// Development - minimal resources, fast feedback
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT, Duration.ofMinutes(5));
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 100);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 20);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, false);
config.set(SqlGatewayRestOptions.PORT, 8083);
config.set(SqlGatewayRestOptions.SSL_ENABLED, false);
return config;
}
public static Configuration forTesting() {
Configuration config = new Configuration();
// Testing - isolated, predictable
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT, Duration.ofMinutes(1));
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 10);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX, 5);
config.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED, false);
config.set(SqlGatewayRestOptions.PORT, 0); // Random port
config.set(SqlGatewayRestOptions.SSL_ENABLED, false);
return config;
}
public static Configuration forProduction() {
// Use production config from previous example
return ProductionGatewayConfig.createProductionConfig();
}
}// Configuration monitoring and updates (conceptual)
public class DynamicConfigurationManager {
private final SqlGateway gateway;
private volatile Configuration currentConfig;
public DynamicConfigurationManager(SqlGateway gateway, Configuration initialConfig) {
this.gateway = gateway;
this.currentConfig = initialConfig;
}
public synchronized void updateConfiguration(Map<String, String> updates) {
Configuration newConfig = currentConfig.clone();
for (Map.Entry<String, String> entry : updates.entrySet()) {
newConfig.setString(entry.getKey(), entry.getValue());
}
// Apply configuration changes that don't require restart
applyRuntimeConfigChanges(newConfig);
this.currentConfig = newConfig;
}
private void applyRuntimeConfigChanges(Configuration newConfig) {
// Update session timeout
Duration newTimeout = newConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT);
// Apply to session manager...
// Update thread pool sizes
int maxThreads = newConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX);
// Apply to thread pool...
// Some changes may require restart - log warnings
if (configRequiresRestart(currentConfig, newConfig)) {
System.out.println("Warning: Some configuration changes require gateway restart");
}
}
private boolean configRequiresRestart(Configuration old, Configuration updated) {
// Check if port, SSL settings, or other restart-required settings changed
return !Objects.equals(old.get(SqlGatewayRestOptions.PORT), updated.get(SqlGatewayRestOptions.PORT)) ||
!Objects.equals(old.get(SqlGatewayRestOptions.SSL_ENABLED), updated.get(SqlGatewayRestOptions.SSL_ENABLED));
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway