CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-client-2-12

SQL Client for exploring and submitting SQL programs to Flink

Pending
Overview
Eval results
Files

configuration-options.mddocs/

Configuration and Options

Configuration system for SQL client behavior including result modes, display options, and execution parameters. The configuration system provides comprehensive control over SQL client execution, result display, and performance tuning.

Capabilities

SqlClientOptions Class

Configuration options for SQL client behavior with typed configuration keys and default values.

public class SqlClientOptions {
    /**
     * Maximum number of rows to cache in table result mode
     * Default: 1,000,000 rows
     * Type: Integer
     */
    public static final ConfigOption<Integer> EXECUTION_MAX_TABLE_RESULT_ROWS;
    
    /**
     * How to display query results - table, changelog, or tableau mode
     * Default: TABLE
     * Type: ResultMode enum
     */
    public static final ConfigOption<ResultMode> EXECUTION_RESULT_MODE;
    
    /**
     * Whether to output verbose error messages with stack traces
     * Default: false
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> VERBOSE;
    
    /**
     * Maximum column width for display formatting
     * Default: 30 characters
     * Type: Integer
     */
    public static final ConfigOption<Integer> DISPLAY_MAX_COLUMN_WIDTH;
}

Usage Example:

// Set configuration options through session properties
executor.setSessionProperty(sessionId, 
    SqlClientOptions.EXECUTION_RESULT_MODE.key(), "TABLEAU");
    
executor.setSessionProperty(sessionId, 
    SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS.key(), "50000");
    
executor.setSessionProperty(sessionId, 
    SqlClientOptions.VERBOSE.key(), "true");
    
executor.setSessionProperty(sessionId, 
    SqlClientOptions.DISPLAY_MAX_COLUMN_WIDTH.key(), "50");

// Access configuration values
ReadableConfig config = executor.getSessionConfig(sessionId);
ResultMode mode = config.get(SqlClientOptions.EXECUTION_RESULT_MODE);
boolean verbose = config.get(SqlClientOptions.VERBOSE);
int maxRows = config.get(SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS);

ResultMode Enumeration

Query result display modes determining how results are presented to users.

public enum ResultMode {
    /**
     * Materialized, paginated table view with interactive navigation
     * Results are cached in memory for random page access
     * Best for: Bounded result sets, data exploration, debugging
     */
    TABLE,
    
    /**
     * Continuous stream visualization showing incremental changes
     * Results stream in real-time as they arrive
     * Best for: Streaming queries, real-time monitoring, change tracking
     */
    CHANGELOG,
    
    /**
     * Direct tableau format display without interactive features
     * Results displayed immediately in formatted table
     * Best for: Non-interactive execution, scripting, simple output
     */
    TABLEAU
}

Usage Example:

// Set result mode based on query type
if (isStreamingQuery) {
    executor.setSessionProperty(sessionId, 
        SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.CHANGELOG.name());
} else if (isInteractiveSession) {
    executor.setSessionProperty(sessionId, 
        SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.TABLE.name());
} else {
    executor.setSessionProperty(sessionId, 
        SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.TABLEAU.name());
}

Flink Configuration Integration

Table Execution Options

Core Flink table execution configuration options accessible through session properties:

// Parallelism configuration
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "8");

// Checkpointing configuration
executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "30s");
executor.setSessionProperty(sessionId, "execution.checkpointing.mode", "EXACTLY_ONCE");

// State backend configuration
executor.setSessionProperty(sessionId, "state.backend", "rocksdb");
executor.setSessionProperty(sessionId, "state.checkpoints.dir", "s3://bucket/checkpoints");

// Resource configuration
executor.setSessionProperty(sessionId, "taskmanager.memory.process.size", "4gb");
executor.setSessionProperty(sessionId, "jobmanager.memory.process.size", "2gb");

SQL Client Execution Options

SQL client specific execution behavior configuration:

// Result caching and display
executor.setSessionProperty(sessionId, "sql-client.execution.max-table-result.rows", "100000");
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
executor.setSessionProperty(sessionId, "sql-client.display.max-column-width", "40");

// Error handling and debugging
executor.setSessionProperty(sessionId, "sql-client.verbose", "true");

// DML synchronous execution
executor.setSessionProperty(sessionId, "table.dml-sync", "true");

Pipeline and Job Configuration

Job-level configuration options for pipeline execution:

// Job naming and identification
executor.setSessionProperty(sessionId, "pipeline.name", "My Analytics Job");
executor.setSessionProperty(sessionId, "pipeline.jars", "file:///path/to/connector.jar");

// Optimization settings
executor.setSessionProperty(sessionId, "table.optimizer.join-reorder-enabled", "true");
executor.setSessionProperty(sessionId, "table.optimizer.agg-phase-strategy", "TWO_PHASE");

// Time and timezone settings
executor.setSessionProperty(sessionId, "table.local-time-zone", "UTC");
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");

Configuration Management Patterns

Session Configuration Lifecycle

Complete session configuration management pattern:

// Open session and configure
String sessionId = executor.openSession("analytics-session");

try {
    // Set base configuration
    executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "16");
    executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
    
    // Add required JARs
    executor.addJar(sessionId, "flink-connector-kafka-1.14.6.jar");
    executor.addJar(sessionId, "flink-json-1.14.6.jar");
    
    // Execute operations with configured environment
    Operation createTable = executor.parseStatement(sessionId, """
        CREATE TABLE events (
            id STRING,
            timestamp TIMESTAMP(3),
            data ROW<field1 STRING, field2 INT>
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'events',
            'properties.bootstrap.servers' = 'localhost:9092',
            'format' = 'json'
        )
        """);
    executor.executeOperation(sessionId, createTable);
    
    // Query with session configuration applied
    QueryOperation query = (QueryOperation) executor.parseStatement(sessionId, 
        "SELECT * FROM events WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '1' HOUR");
    ResultDescriptor result = executor.executeQuery(sessionId, query);
    
} finally {
    executor.closeSession(sessionId);
}

Configuration Validation and Error Handling

Proper configuration validation and error handling:

try {
    // Validate configuration before setting
    String parallelism = "invalid_number";
    Integer.parseInt(parallelism); // Validate numeric values
    executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", parallelism);
    
} catch (NumberFormatException e) {
    System.err.println("Invalid parallelism value: " + parallelism);
    // Use default or prompt for correction
}

try {
    executor.setSessionProperty(sessionId, "invalid.config.key", "value");
} catch (Exception e) {
    System.err.println("Invalid configuration key: " + e.getMessage());
    // Handle configuration error
}

// Check configuration values
ReadableConfig config = executor.getSessionConfig(sessionId);
if (config.get(SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS) > 1000000) {
    System.out.println("Warning: Large result cache size may cause memory issues");
}

Environment-Specific Configuration

Configuration patterns for different environments:

// Development environment
void configureForDevelopment(Executor executor, String sessionId) {
    executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "1");
    executor.setSessionProperty(sessionId, "sql-client.verbose", "true");
    executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
    executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "10s");
}

// Production environment
void configureForProduction(Executor executor, String sessionId) {
    executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "16");
    executor.setSessionProperty(sessionId, "sql-client.verbose", "false");
    executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "tableau");
    executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "60s");
    executor.setSessionProperty(sessionId, "state.backend", "rocksdb");
    executor.setSessionProperty(sessionId, "state.checkpoints.dir", "s3://prod-checkpoints/");
}

// Streaming environment
void configureForStreaming(Executor executor, String sessionId) {
    executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "changelog");
    executor.setSessionProperty(sessionId, "table.exec.source.idle-timeout", "30s");
    executor.setSessionProperty(sessionId, "execution.checkpointing.mode", "EXACTLY_ONCE");
    executor.setSessionProperty(sessionId, "table.exec.sink.not-null-enforcer", "error");
}

Configuration Property Categories

Display and Output Configuration

// Result display formatting
"sql-client.display.max-column-width" = "50"          // Column width limit
"sql-client.execution.result-mode" = "table"          // Display mode
"sql-client.execution.max-table-result.rows" = "10000" // Cache limit

// Error output configuration
"sql-client.verbose" = "true"                         // Verbose errors

Execution and Performance Configuration

// Core execution settings
"table.exec.resource.default-parallelism" = "8"       // Default parallelism
"table.dml-sync" = "false"                            // Async DML execution

// Optimization settings
"table.optimizer.join-reorder-enabled" = "true"       // Join optimization
"table.optimizer.agg-phase-strategy" = "AUTO"         // Aggregation strategy

Checkpointing and State Configuration

// Checkpointing configuration
"execution.checkpointing.interval" = "30s"            // Checkpoint interval
"execution.checkpointing.mode" = "EXACTLY_ONCE"       // Consistency mode
"execution.checkpointing.timeout" = "10min"           // Checkpoint timeout

// State backend configuration
"state.backend" = "rocksdb"                           // State backend type
"state.checkpoints.dir" = "s3://bucket/checkpoints"   // Checkpoint storage
"state.savepoints.dir" = "s3://bucket/savepoints"     // Savepoint storage

Resource and Memory Configuration

// TaskManager configuration
"taskmanager.memory.process.size" = "4gb"             // Total TM memory
"taskmanager.memory.task.heap.fraction" = "0.4"       // Task heap fraction
"taskmanager.memory.managed.fraction" = "0.4"         // Managed memory

// JobManager configuration
"jobmanager.memory.process.size" = "2gb"              // Total JM memory
"jobmanager.memory.heap.size" = "1.5gb"               // JM heap size

Configuration Reset and Management

Property Reset Operations

// Reset specific property to default
executor.resetSessionProperty(sessionId, "table.exec.resource.default-parallelism");

// Reset all session properties
executor.resetSessionProperties(sessionId);

// Get current configuration state
Map<String, String> currentConfig = executor.getSessionConfigMap(sessionId);
currentConfig.forEach((key, value) -> 
    System.out.println(key + " = " + value));

Configuration Inheritance

Configuration inheritance and override patterns:

// Base configuration from CLI options
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);

// Session inherits from default context
SessionContext sessionContext = LocalContextUtils.buildSessionContext(sessionId, defaultContext);

// Session-specific overrides
sessionContext.set("table.exec.resource.default-parallelism", "8");

// Property lookup order:
// 1. Session-specific properties
// 2. Default context properties  
// 3. Flink configuration defaults

The configuration system provides comprehensive control over SQL client behavior, execution performance, and result display, enabling fine-tuned optimization for different use cases and environments.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-client-2-12

docs

command-line-interface.md

configuration-options.md

index.md

result-handling-display.md

session-context-management.md

sql-client-application.md

sql-execution-gateway.md

tile.json