SQL Client for exploring and submitting SQL programs to Flink
—
Configuration and context management for execution environments with property handling, dependency management, and session isolation. The context system provides the foundation for SQL execution environments with proper resource management and configuration inheritance.
Default context configuration container providing base execution environment setup.
public class DefaultContext {
/**
* Create default context with dependencies, configuration and command lines
* @param dependencies List of dependency URLs for classpath
* @param flinkConfig Flink configuration properties
* @param commandLines Custom command line processors
*/
public DefaultContext(List<URL> dependencies, Configuration flinkConfig, List<CustomCommandLine> commandLines);
/**
* Get underlying Flink configuration
* @return Configuration instance with Flink properties
*/
public Configuration getFlinkConfig();
/**
* Get dependency URLs for classpath construction
* @return List of URLs for JARs and libraries
*/
public List<URL> getDependencies();
}Usage Example:
import org.apache.flink.table.client.gateway.local.LocalContextUtils;
// Build default context from CLI options
CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);
DefaultContext context = LocalContextUtils.buildDefaultContext(options);
// Access configuration
Configuration flinkConfig = context.getFlinkConfig();
List<URL> dependencies = context.getDependencies();
// Use context with executor
LocalExecutor executor = new LocalExecutor(context);Session-specific context and state management with isolated configuration and dependencies.
public class SessionContext {
/**
* Get unique session identifier
* @return Session ID string
*/
public String getSessionId();
/**
* Get session configuration as mutable Map
* @return Map of configuration key-value pairs
*/
public Map<String, String> getConfigMap();
/**
* Get session configuration as ReadableConfig for type-safe access
* @return ReadableConfig instance
*/
public ReadableConfig getReadableConfig();
}Manage session-level configuration properties with default value handling.
/**
* Set session property to specific value
* @param key Configuration property key
* @param value Configuration property value
*/
public void set(String key, String value);
/**
* Reset all session properties to default values
* Clears all session-specific overrides
*/
public void reset();
/**
* Reset specific property to default value
* @param key Property key to reset
*/
public void reset(String key);Usage Example:
SessionContext session = LocalContextUtils.buildSessionContext("my-session", defaultContext);
// Set session properties
session.set("table.exec.resource.default-parallelism", "4");
session.set("sql-client.execution.result-mode", "tableau");
session.set("table.exec.sink.not-null-enforcer", "error");
// Access configuration
ReadableConfig config = session.getReadableConfig();
int parallelism = config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
// Reset specific property
session.reset("table.exec.resource.default-parallelism");
// Reset all session properties
session.reset();Manage session-level JAR dependencies for custom functions, connectors, and formats.
/**
* Add JAR to session classpath
* @param jarUrl URL or path to JAR file
*/
public void addJar(String jarUrl);
/**
* Remove JAR from session classpath
* @param jarUrl URL or path to JAR file to remove
*/
public void removeJar(String jarUrl);
/**
* List all loaded JARs in session
* @return List of JAR URLs/paths
*/
public List<String> listJars();Usage Example:
// Add custom connector
session.addJar("/path/to/flink-connector-kafka-1.14.6.jar");
// Add format JAR
session.addJar("file:///opt/flink/lib/flink-json-1.14.6.jar");
// Add from Maven repository URL
session.addJar("https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.14.6/flink-csv-1.14.6.jar");
// List loaded JARs
List<String> jars = session.listJars();
jars.forEach(jar -> System.out.println("Loaded: " + jar));
// Remove JAR when no longer needed
session.removeJar("/path/to/flink-connector-kafka-1.14.6.jar");Proper resource cleanup and session management.
/**
* Close session and release all associated resources
* Cleans up classloader, temp files, and execution contexts
*/
public void close();Usage Example:
SessionContext session = LocalContextUtils.buildSessionContext("session-1", defaultContext);
try {
// Use session for SQL operations
session.set("key", "value");
session.addJar("path/to/connector.jar");
// Session operations...
} finally {
// Always close session to prevent resource leaks
session.close();
}Execution environment context providing Table API integration.
public class ExecutionContext {
/**
* Create execution context from session context
* @param sessionContext Session-specific context
* @param defaultContext Default context for base configuration
*/
public static ExecutionContext create(SessionContext sessionContext, DefaultContext defaultContext);
/**
* Get TableEnvironment for SQL execution
* @return TableEnvironment instance configured for this context
*/
public TableEnvironment getTableEnvironment();
/**
* Get session identifier
* @return Session ID
*/
public String getSessionId();
}Utility class providing factory methods for context creation and management.
public class LocalContextUtils {
/**
* Build default context from CLI options
* @param options Parsed command line options
* @return DefaultContext configured with options
*/
public static DefaultContext buildDefaultContext(CliOptions options);
/**
* Build session context with specified ID and default context
* @param sessionId Desired session identifier (null for auto-generation)
* @param defaultContext Base context for configuration inheritance
* @return SessionContext ready for use
*/
public static SessionContext buildSessionContext(String sessionId, DefaultContext defaultContext);
}Usage Example:
// Create contexts
CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
SessionContext sessionContext = LocalContextUtils.buildSessionContext("my-session", defaultContext);
// Use contexts
LocalExecutor executor = new LocalExecutor(defaultContext);
executor.start();
String actualSessionId = executor.openSession(sessionContext.getSessionId());The context system integrates with Flink's configuration system:
// Common configuration keys
session.set("table.exec.resource.default-parallelism", "8");
session.set("table.optimizer.join-reorder-enabled", "true");
session.set("pipeline.name", "My SQL Job");
session.set("execution.checkpointing.interval", "30s");
session.set("state.backend", "rocksdb");SQL Client specific configuration options:
// Result display options
session.set("sql-client.execution.result-mode", "tableau");
session.set("sql-client.execution.max-table-result.rows", "10000");
session.set("sql-client.display.max-column-width", "50");
// Execution options
session.set("sql-client.verbose", "true");Context creation respects environment variables:
FLINK_CONF_DIR: Configuration directory pathFLINK_LIB_DIR: Library directory for dependenciesHADOOP_CONF_DIR: Hadoop configuration for HDFS accessEach session provides complete isolation:
Example Multi-Session Usage:
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
LocalExecutor executor = new LocalExecutor(defaultContext);
executor.start();
// Create multiple isolated sessions
String session1 = executor.openSession("analytics-session");
String session2 = executor.openSession("etl-session");
try {
// Configure session 1 for analytics
executor.setSessionProperty(session1, "table.exec.resource.default-parallelism", "16");
executor.addJar(session1, "analytics-connector.jar");
// Configure session 2 for ETL
executor.setSessionProperty(session2, "table.exec.resource.default-parallelism", "4");
executor.addJar(session2, "etl-connector.jar");
// Sessions operate independently
executor.executeOperation(session1, analyticsQuery);
executor.executeOperation(session2, etlQuery);
} finally {
executor.closeSession(session1);
executor.closeSession(session2);
}Context and session management handle various error conditions:
Example Error Handling:
try {
session.addJar("/invalid/path/connector.jar");
} catch (Exception e) {
System.err.println("Failed to load JAR: " + e.getMessage());
// Handle JAR loading error
}
try {
session.set("invalid.config.key", "value");
} catch (IllegalArgumentException e) {
System.err.println("Invalid configuration: " + e.getMessage());
// Handle configuration error
}The context and session management system provides a robust foundation for SQL execution with proper resource management, configuration isolation, and dependency handling.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-client-2-12