SQL Client for exploring and submitting SQL programs to Flink
—
Configuration system for SQL client behavior including result modes, display options, and execution parameters. The configuration system provides comprehensive control over SQL client execution, result display, and performance tuning.
Configuration options for SQL client behavior with typed configuration keys and default values.
public class SqlClientOptions {
/**
* Maximum number of rows to cache in table result mode
* Default: 1,000,000 rows
* Type: Integer
*/
public static final ConfigOption<Integer> EXECUTION_MAX_TABLE_RESULT_ROWS;
/**
* How to display query results - table, changelog, or tableau mode
* Default: TABLE
* Type: ResultMode enum
*/
public static final ConfigOption<ResultMode> EXECUTION_RESULT_MODE;
/**
* Whether to output verbose error messages with stack traces
* Default: false
* Type: Boolean
*/
public static final ConfigOption<Boolean> VERBOSE;
/**
* Maximum column width for display formatting
* Default: 30 characters
* Type: Integer
*/
public static final ConfigOption<Integer> DISPLAY_MAX_COLUMN_WIDTH;
}Usage Example:
// Set configuration options through session properties
executor.setSessionProperty(sessionId,
SqlClientOptions.EXECUTION_RESULT_MODE.key(), "TABLEAU");
executor.setSessionProperty(sessionId,
SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS.key(), "50000");
executor.setSessionProperty(sessionId,
SqlClientOptions.VERBOSE.key(), "true");
executor.setSessionProperty(sessionId,
SqlClientOptions.DISPLAY_MAX_COLUMN_WIDTH.key(), "50");
// Access configuration values
ReadableConfig config = executor.getSessionConfig(sessionId);
ResultMode mode = config.get(SqlClientOptions.EXECUTION_RESULT_MODE);
boolean verbose = config.get(SqlClientOptions.VERBOSE);
int maxRows = config.get(SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS);Query result display modes determining how results are presented to users.
public enum ResultMode {
/**
* Materialized, paginated table view with interactive navigation
* Results are cached in memory for random page access
* Best for: Bounded result sets, data exploration, debugging
*/
TABLE,
/**
* Continuous stream visualization showing incremental changes
* Results stream in real-time as they arrive
* Best for: Streaming queries, real-time monitoring, change tracking
*/
CHANGELOG,
/**
* Direct tableau format display without interactive features
* Results displayed immediately in formatted table
* Best for: Non-interactive execution, scripting, simple output
*/
TABLEAU
}Usage Example:
// Set result mode based on query type
if (isStreamingQuery) {
executor.setSessionProperty(sessionId,
SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.CHANGELOG.name());
} else if (isInteractiveSession) {
executor.setSessionProperty(sessionId,
SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.TABLE.name());
} else {
executor.setSessionProperty(sessionId,
SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.TABLEAU.name());
}Core Flink table execution configuration options accessible through session properties:
// Parallelism configuration
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "8");
// Checkpointing configuration
executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "30s");
executor.setSessionProperty(sessionId, "execution.checkpointing.mode", "EXACTLY_ONCE");
// State backend configuration
executor.setSessionProperty(sessionId, "state.backend", "rocksdb");
executor.setSessionProperty(sessionId, "state.checkpoints.dir", "s3://bucket/checkpoints");
// Resource configuration
executor.setSessionProperty(sessionId, "taskmanager.memory.process.size", "4gb");
executor.setSessionProperty(sessionId, "jobmanager.memory.process.size", "2gb");SQL client specific execution behavior configuration:
// Result caching and display
executor.setSessionProperty(sessionId, "sql-client.execution.max-table-result.rows", "100000");
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
executor.setSessionProperty(sessionId, "sql-client.display.max-column-width", "40");
// Error handling and debugging
executor.setSessionProperty(sessionId, "sql-client.verbose", "true");
// DML synchronous execution
executor.setSessionProperty(sessionId, "table.dml-sync", "true");Job-level configuration options for pipeline execution:
// Job naming and identification
executor.setSessionProperty(sessionId, "pipeline.name", "My Analytics Job");
executor.setSessionProperty(sessionId, "pipeline.jars", "file:///path/to/connector.jar");
// Optimization settings
executor.setSessionProperty(sessionId, "table.optimizer.join-reorder-enabled", "true");
executor.setSessionProperty(sessionId, "table.optimizer.agg-phase-strategy", "TWO_PHASE");
// Time and timezone settings
executor.setSessionProperty(sessionId, "table.local-time-zone", "UTC");
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");Complete session configuration management pattern:
// Open session and configure
String sessionId = executor.openSession("analytics-session");
try {
// Set base configuration
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "16");
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
// Add required JARs
executor.addJar(sessionId, "flink-connector-kafka-1.14.6.jar");
executor.addJar(sessionId, "flink-json-1.14.6.jar");
// Execute operations with configured environment
Operation createTable = executor.parseStatement(sessionId, """
CREATE TABLE events (
id STRING,
timestamp TIMESTAMP(3),
data ROW<field1 STRING, field2 INT>
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""");
executor.executeOperation(sessionId, createTable);
// Query with session configuration applied
QueryOperation query = (QueryOperation) executor.parseStatement(sessionId,
"SELECT * FROM events WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '1' HOUR");
ResultDescriptor result = executor.executeQuery(sessionId, query);
} finally {
executor.closeSession(sessionId);
}Proper configuration validation and error handling:
try {
// Validate configuration before setting
String parallelism = "invalid_number";
Integer.parseInt(parallelism); // Validate numeric values
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", parallelism);
} catch (NumberFormatException e) {
System.err.println("Invalid parallelism value: " + parallelism);
// Use default or prompt for correction
}
try {
executor.setSessionProperty(sessionId, "invalid.config.key", "value");
} catch (Exception e) {
System.err.println("Invalid configuration key: " + e.getMessage());
// Handle configuration error
}
// Check configuration values
ReadableConfig config = executor.getSessionConfig(sessionId);
if (config.get(SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS) > 1000000) {
System.out.println("Warning: Large result cache size may cause memory issues");
}Configuration patterns for different environments:
// Development environment
void configureForDevelopment(Executor executor, String sessionId) {
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "1");
executor.setSessionProperty(sessionId, "sql-client.verbose", "true");
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "10s");
}
// Production environment
void configureForProduction(Executor executor, String sessionId) {
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "16");
executor.setSessionProperty(sessionId, "sql-client.verbose", "false");
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "tableau");
executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "60s");
executor.setSessionProperty(sessionId, "state.backend", "rocksdb");
executor.setSessionProperty(sessionId, "state.checkpoints.dir", "s3://prod-checkpoints/");
}
// Streaming environment
void configureForStreaming(Executor executor, String sessionId) {
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "changelog");
executor.setSessionProperty(sessionId, "table.exec.source.idle-timeout", "30s");
executor.setSessionProperty(sessionId, "execution.checkpointing.mode", "EXACTLY_ONCE");
executor.setSessionProperty(sessionId, "table.exec.sink.not-null-enforcer", "error");
}// Result display formatting
"sql-client.display.max-column-width" = "50" // Column width limit
"sql-client.execution.result-mode" = "table" // Display mode
"sql-client.execution.max-table-result.rows" = "10000" // Cache limit
// Error output configuration
"sql-client.verbose" = "true" // Verbose errors// Core execution settings
"table.exec.resource.default-parallelism" = "8" // Default parallelism
"table.dml-sync" = "false" // Async DML execution
// Optimization settings
"table.optimizer.join-reorder-enabled" = "true" // Join optimization
"table.optimizer.agg-phase-strategy" = "AUTO" // Aggregation strategy// Checkpointing configuration
"execution.checkpointing.interval" = "30s" // Checkpoint interval
"execution.checkpointing.mode" = "EXACTLY_ONCE" // Consistency mode
"execution.checkpointing.timeout" = "10min" // Checkpoint timeout
// State backend configuration
"state.backend" = "rocksdb" // State backend type
"state.checkpoints.dir" = "s3://bucket/checkpoints" // Checkpoint storage
"state.savepoints.dir" = "s3://bucket/savepoints" // Savepoint storage// TaskManager configuration
"taskmanager.memory.process.size" = "4gb" // Total TM memory
"taskmanager.memory.task.heap.fraction" = "0.4" // Task heap fraction
"taskmanager.memory.managed.fraction" = "0.4" // Managed memory
// JobManager configuration
"jobmanager.memory.process.size" = "2gb" // Total JM memory
"jobmanager.memory.heap.size" = "1.5gb" // JM heap size// Reset specific property to default
executor.resetSessionProperty(sessionId, "table.exec.resource.default-parallelism");
// Reset all session properties
executor.resetSessionProperties(sessionId);
// Get current configuration state
Map<String, String> currentConfig = executor.getSessionConfigMap(sessionId);
currentConfig.forEach((key, value) ->
System.out.println(key + " = " + value));Configuration inheritance and override patterns:
// Base configuration from CLI options
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
// Session inherits from default context
SessionContext sessionContext = LocalContextUtils.buildSessionContext(sessionId, defaultContext);
// Session-specific overrides
sessionContext.set("table.exec.resource.default-parallelism", "8");
// Property lookup order:
// 1. Session-specific properties
// 2. Default context properties
// 3. Flink configuration defaultsThe configuration system provides comprehensive control over SQL client behavior, execution performance, and result display, enabling fine-tuned optimization for different use cases and environments.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-client-2-12