CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-client-2-12

SQL Client for exploring and submitting SQL programs to Flink

Pending
Overview
Eval results
Files

session-context-management.mddocs/

Session and Context Management

Configuration and context management for execution environments with property handling, dependency management, and session isolation. The context system provides the foundation for SQL execution environments with proper resource management and configuration inheritance.

Capabilities

DefaultContext Class

Default context configuration container providing base execution environment setup.

public class DefaultContext {
    /**
     * Create default context with dependencies, configuration and command lines
     * @param dependencies List of dependency URLs for classpath
     * @param flinkConfig Flink configuration properties
     * @param commandLines Custom command line processors
     */
    public DefaultContext(List<URL> dependencies, Configuration flinkConfig, List<CustomCommandLine> commandLines);
    
    /**
     * Get underlying Flink configuration
     * @return Configuration instance with Flink properties
     */
    public Configuration getFlinkConfig();
    
    /**
     * Get dependency URLs for classpath construction
     * @return List of URLs for JARs and libraries
     */
    public List<URL> getDependencies();
}

Usage Example:

import org.apache.flink.table.client.gateway.local.LocalContextUtils;

// Build default context from CLI options
CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);
DefaultContext context = LocalContextUtils.buildDefaultContext(options);

// Access configuration
Configuration flinkConfig = context.getFlinkConfig();
List<URL> dependencies = context.getDependencies();

// Use context with executor
LocalExecutor executor = new LocalExecutor(context);

SessionContext Class

Session-specific context and state management with isolated configuration and dependencies.

public class SessionContext {
    /**
     * Get unique session identifier
     * @return Session ID string
     */
    public String getSessionId();
    
    /**
     * Get session configuration as mutable Map
     * @return Map of configuration key-value pairs
     */
    public Map<String, String> getConfigMap();
    
    /**
     * Get session configuration as ReadableConfig for type-safe access
     * @return ReadableConfig instance
     */
    public ReadableConfig getReadableConfig();
}

Session Property Management

Manage session-level configuration properties with default value handling.

/**
 * Set session property to specific value
 * @param key Configuration property key
 * @param value Configuration property value
 */
public void set(String key, String value);

/**
 * Reset all session properties to default values
 * Clears all session-specific overrides
 */
public void reset();

/**
 * Reset specific property to default value
 * @param key Property key to reset
 */
public void reset(String key);

Usage Example:

SessionContext session = LocalContextUtils.buildSessionContext("my-session", defaultContext);

// Set session properties
session.set("table.exec.resource.default-parallelism", "4");
session.set("sql-client.execution.result-mode", "tableau");
session.set("table.exec.sink.not-null-enforcer", "error");

// Access configuration
ReadableConfig config = session.getReadableConfig();
int parallelism = config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);

// Reset specific property
session.reset("table.exec.resource.default-parallelism");

// Reset all session properties
session.reset();

JAR and Dependency Management

Manage session-level JAR dependencies for custom functions, connectors, and formats.

/**
 * Add JAR to session classpath
 * @param jarUrl URL or path to JAR file
 */
public void addJar(String jarUrl);

/**
 * Remove JAR from session classpath  
 * @param jarUrl URL or path to JAR file to remove
 */
public void removeJar(String jarUrl);

/**
 * List all loaded JARs in session
 * @return List of JAR URLs/paths
 */
public List<String> listJars();

Usage Example:

// Add custom connector
session.addJar("/path/to/flink-connector-kafka-1.14.6.jar");

// Add format JAR
session.addJar("file:///opt/flink/lib/flink-json-1.14.6.jar");

// Add from Maven repository URL
session.addJar("https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.14.6/flink-csv-1.14.6.jar");

// List loaded JARs
List<String> jars = session.listJars();
jars.forEach(jar -> System.out.println("Loaded: " + jar));

// Remove JAR when no longer needed
session.removeJar("/path/to/flink-connector-kafka-1.14.6.jar");

Session Lifecycle Management

Proper resource cleanup and session management.

/**
 * Close session and release all associated resources
 * Cleans up classloader, temp files, and execution contexts
 */
public void close();

Usage Example:

SessionContext session = LocalContextUtils.buildSessionContext("session-1", defaultContext);
try {
    // Use session for SQL operations
    session.set("key", "value");
    session.addJar("path/to/connector.jar");
    
    // Session operations...
    
} finally {
    // Always close session to prevent resource leaks
    session.close();
}

ExecutionContext Class

Execution environment context providing Table API integration.

public class ExecutionContext {
    /**
     * Create execution context from session context
     * @param sessionContext Session-specific context
     * @param defaultContext Default context for base configuration
     */
    public static ExecutionContext create(SessionContext sessionContext, DefaultContext defaultContext);
    
    /**
     * Get TableEnvironment for SQL execution
     * @return TableEnvironment instance configured for this context
     */
    public TableEnvironment getTableEnvironment();
    
    /**
     * Get session identifier
     * @return Session ID
     */
    public String getSessionId();
}

LocalContextUtils Class

Utility class providing factory methods for context creation and management.

public class LocalContextUtils {
    /**
     * Build default context from CLI options
     * @param options Parsed command line options
     * @return DefaultContext configured with options
     */
    public static DefaultContext buildDefaultContext(CliOptions options);
    
    /**
     * Build session context with specified ID and default context
     * @param sessionId Desired session identifier (null for auto-generation)
     * @param defaultContext Base context for configuration inheritance
     * @return SessionContext ready for use
     */
    public static SessionContext buildSessionContext(String sessionId, DefaultContext defaultContext);
}

Usage Example:

// Create contexts
CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
SessionContext sessionContext = LocalContextUtils.buildSessionContext("my-session", defaultContext);

// Use contexts
LocalExecutor executor = new LocalExecutor(defaultContext);
executor.start();
String actualSessionId = executor.openSession(sessionContext.getSessionId());

Configuration Integration

Flink Configuration Properties

The context system integrates with Flink's configuration system:

// Common configuration keys
session.set("table.exec.resource.default-parallelism", "8");
session.set("table.optimizer.join-reorder-enabled", "true");
session.set("pipeline.name", "My SQL Job");
session.set("execution.checkpointing.interval", "30s");
session.set("state.backend", "rocksdb");

SQL Client Specific Options

SQL Client specific configuration options:

// Result display options
session.set("sql-client.execution.result-mode", "tableau");
session.set("sql-client.execution.max-table-result.rows", "10000");
session.set("sql-client.display.max-column-width", "50");

// Execution options
session.set("sql-client.verbose", "true");

Environment Variable Integration

Context creation respects environment variables:

  • FLINK_CONF_DIR: Configuration directory path
  • FLINK_LIB_DIR: Library directory for dependencies
  • HADOOP_CONF_DIR: Hadoop configuration for HDFS access

Session Isolation

Each session provides complete isolation:

  • Configuration: Independent property overrides
  • Classpath: Session-specific JAR loading
  • State: Separate execution contexts
  • Resources: Isolated cleanup on session close

Example Multi-Session Usage:

DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
LocalExecutor executor = new LocalExecutor(defaultContext);
executor.start();

// Create multiple isolated sessions
String session1 = executor.openSession("analytics-session");
String session2 = executor.openSession("etl-session");

try {
    // Configure session 1 for analytics
    executor.setSessionProperty(session1, "table.exec.resource.default-parallelism", "16");
    executor.addJar(session1, "analytics-connector.jar");
    
    // Configure session 2 for ETL
    executor.setSessionProperty(session2, "table.exec.resource.default-parallelism", "4");
    executor.addJar(session2, "etl-connector.jar");
    
    // Sessions operate independently
    executor.executeOperation(session1, analyticsQuery);
    executor.executeOperation(session2, etlQuery);
    
} finally {
    executor.closeSession(session1);
    executor.closeSession(session2);
}

Error Handling

Context and session management handle various error conditions:

  • Invalid JAR paths: File not found or access denied
  • Configuration errors: Invalid property keys or values
  • Resource conflicts: Duplicate session IDs
  • Cleanup failures: Resource release issues

Example Error Handling:

try {
    session.addJar("/invalid/path/connector.jar");
} catch (Exception e) {
    System.err.println("Failed to load JAR: " + e.getMessage());
    // Handle JAR loading error
}

try {
    session.set("invalid.config.key", "value");
} catch (IllegalArgumentException e) {
    System.err.println("Invalid configuration: " + e.getMessage()); 
    // Handle configuration error
}

The context and session management system provides a robust foundation for SQL execution with proper resource management, configuration isolation, and dependency handling.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-client-2-12

docs

command-line-interface.md

configuration-options.md

index.md

result-handling-display.md

session-context-management.md

sql-client-application.md

sql-execution-gateway.md

tile.json