CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-client-2-12

SQL Client for exploring and submitting SQL programs to Flink

Pending
Overview
Eval results
Files

sql-execution-gateway.mddocs/

SQL Execution Gateway

Core execution interface providing session management, SQL parsing, and operation execution. The gateway abstracts the underlying Flink execution environment and provides a unified API for SQL operations across different deployment modes.

Capabilities

Executor Interface

Primary interface for SQL execution operations with complete session lifecycle management.

public interface Executor {
    /**
     * Initialize executor and ensure readiness for command execution
     * @throws SqlExecutionException if initialization fails
     */
    void start() throws SqlExecutionException;
    
    /**
     * Open new session with optional session identifier
     * @param sessionId Desired session ID or null for auto-generation
     * @return Actual session ID used for tracking
     * @throws SqlExecutionException if session creation fails
     */
    String openSession(String sessionId) throws SqlExecutionException;
    
    /**
     * Close session and release associated resources
     * @param sessionId Session identifier to close
     * @throws SqlExecutionException if session closure fails
     */
    void closeSession(String sessionId) throws SqlExecutionException;
}

Configuration Management

Session-level configuration management with property get/set operations.

/**
 * Get session configuration as Map for external access
 * @param sessionId Session identifier
 * @return Copy of all session configuration properties
 * @throws SqlExecutionException if session not found
 */
Map<String, String> getSessionConfigMap(String sessionId) throws SqlExecutionException;

/**
 * Get session configuration as ReadableConfig for type-safe access
 * @param sessionId Session identifier
 * @return ReadableConfig instance with session properties
 * @throws SqlExecutionException if session not found
 */
ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException;

/**
 * Reset all session properties to default values
 * @param sessionId Session identifier
 * @throws SqlExecutionException if reset fails
 */
void resetSessionProperties(String sessionId) throws SqlExecutionException;

/**
 * Reset specific session property to default value
 * @param sessionId Session identifier
 * @param key Property key to reset
 * @throws SqlExecutionException if reset fails
 */
void resetSessionProperty(String sessionId, String key) throws SqlExecutionException;

/**
 * Set session property to specific value
 * @param sessionId Session identifier
 * @param key Property key
 * @param value Property value
 * @throws SqlExecutionException if property setting fails
 */
void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException;

Usage Example:

// Configure session properties
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "tableau");
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "4");

// Get current configuration
Map<String, String> config = executor.getSessionConfigMap(sessionId);
ReadableConfig readableConfig = executor.getSessionConfig(sessionId);

// Reset properties
executor.resetSessionProperty(sessionId, "table.exec.resource.default-parallelism");
executor.resetSessionProperties(sessionId); // Reset all

SQL Statement Processing

Parse and execute SQL statements with full operation lifecycle support.

/**
 * Parse SQL statement into executable Operation
 * @param sessionId Session identifier for context
 * @param statement SQL statement to parse
 * @return Operation instance ready for execution
 * @throws SqlExecutionException if parsing fails
 */
Operation parseStatement(String sessionId, String statement) throws SqlExecutionException;

/**
 * Provide auto-completion suggestions for SQL statement
 * @param sessionId Session identifier for context
 * @param statement Partial SQL statement
 * @param position Cursor position within statement
 * @return List of completion suggestions
 */
List<String> completeStatement(String sessionId, String statement, int position);

/**
 * Execute parsed operation and return result
 * @param sessionId Session identifier
 * @param operation Parsed operation to execute
 * @return TableResult with operation results
 * @throws SqlExecutionException if execution fails
 */
TableResult executeOperation(String sessionId, Operation operation) throws SqlExecutionException;

Usage Example:

// Parse and execute statement
String sql = "CREATE TABLE users (id INT, name STRING) WITH ('connector' = 'datagen')";
Operation operation = executor.parseStatement(sessionId, sql);
TableResult result = executor.executeOperation(sessionId, operation);

// Auto-completion
List<String> suggestions = executor.completeStatement(sessionId, "SELECT * FROM use", 17);
// Returns: ["users", "user_events", ...] based on available tables

Query Execution and Results

Execute queries with advanced result handling for streaming and batch operations.

/**
 * Execute SELECT query and return result descriptor for streaming access
 * @param sessionId Session identifier
 * @param query Parsed query operation
 * @return ResultDescriptor for accessing query results
 * @throws SqlExecutionException if query execution fails
 */
ResultDescriptor executeQuery(String sessionId, QueryOperation query) throws SqlExecutionException;

/**
 * Retrieve incremental changes from streaming query result
 * @param sessionId Session identifier
 * @param resultId Result identifier from executeQuery
 * @return TypedResult containing list of result rows or status
 * @throws SqlExecutionException if retrieval fails
 */
TypedResult<List<Row>> retrieveResultChanges(String sessionId, String resultId) throws SqlExecutionException;

/**
 * Create materialized snapshot of query result with pagination
 * @param sessionId Session identifier
 * @param resultId Result identifier from executeQuery
 * @param pageSize Number of rows per page
 * @return TypedResult containing total page count
 * @throws SqlExecutionException if snapshot creation fails
 */
TypedResult<Integer> snapshotResult(String sessionId, String resultId, int pageSize) throws SqlExecutionException;

/**
 * Retrieve specific page from materialized result snapshot
 * @param resultId Result identifier
 * @param page Page number (0-based)
 * @return List of rows for the requested page
 * @throws SqlExecutionException if page retrieval fails
 */
List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;

/**
 * Cancel running query and stop result generation
 * @param sessionId Session identifier
 * @param resultId Result identifier to cancel
 * @throws SqlExecutionException if cancellation fails
 */
void cancelQuery(String sessionId, String resultId) throws SqlExecutionException;

Usage Example:

// Execute streaming query
QueryOperation queryOp = (QueryOperation) executor.parseStatement(sessionId, "SELECT * FROM events");
ResultDescriptor descriptor = executor.executeQuery(sessionId, queryOp);

if (descriptor.isMaterialized()) {
    // Handle materialized results with pagination
    TypedResult<Integer> pageCount = executor.snapshotResult(sessionId, descriptor.getResultId(), 100);
    if (pageCount.getType() == TypedResult.ResultType.PAYLOAD) {
        for (int page = 0; page < pageCount.getPayload(); page++) {
            List<Row> rows = executor.retrieveResultPage(descriptor.getResultId(), page);
            // Process rows
        }
    }
} else {
    // Handle streaming results
    while (true) {
        TypedResult<List<Row>> changes = executor.retrieveResultChanges(sessionId, descriptor.getResultId());
        if (changes.getType() == TypedResult.ResultType.PAYLOAD) {
            List<Row> rows = changes.getPayload();
            // Process incremental changes
        } else if (changes.getType() == TypedResult.ResultType.EOS) {
            break; // End of stream
        }
        // Handle EMPTY results by continuing to poll
    }
}

Batch Modification Operations

Execute multiple modification operations as a batch for better performance.

/**
 * Execute multiple modification operations as batch
 * @param sessionId Session identifier
 * @param operations List of modification operations (INSERT, UPDATE, DELETE)
 * @return TableResult with batch execution results
 * @throws SqlExecutionException if batch execution fails
 */
TableResult executeModifyOperations(String sessionId, List<ModifyOperation> operations) throws SqlExecutionException;

Usage Example:

// Batch multiple INSERT operations
List<ModifyOperation> operations = Arrays.asList(
    (ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table1 VALUES (1, 'a')"),
    (ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table2 VALUES (2, 'b')")
);

TableResult batchResult = executor.executeModifyOperations(sessionId, operations);
System.out.println("Batch job ID: " + batchResult.getJobClient().get().getJobID());

JAR and Dependency Management

Manage session-level JAR dependencies for custom functions and connectors.

/**
 * Add JAR file to session classpath
 * @param sessionId Session identifier
 * @param jarPath Path to JAR file (local or remote URL)
 */
void addJar(String sessionId, String jarPath);

/**
 * Remove JAR file from session classpath
 * @param sessionId Session identifier  
 * @param jarPath Path to JAR file to remove
 */
void removeJar(String sessionId, String jarPath);

/**
 * List all JAR files loaded in session
 * @param sessionId Session identifier
 * @return List of JAR file paths
 */
List<String> listJars(String sessionId);

Usage Example:

// Add custom connector JAR
executor.addJar(sessionId, "/path/to/custom-connector-1.0.jar");

// Add JAR from URL
executor.addJar(sessionId, "https://repo.maven.apache.org/maven2/org/example/connector/1.0/connector-1.0.jar");

// List loaded JARs
List<String> jars = executor.listJars(sessionId);
jars.forEach(System.out::println);

// Remove JAR when no longer needed
executor.removeJar(sessionId, "/path/to/custom-connector-1.0.jar");

LocalExecutor Implementation

Concrete implementation of Executor interface for embedded execution mode.

public class LocalExecutor implements Executor {
    /**
     * Create local executor with default context
     * @param defaultContext Default execution context with configuration and dependencies
     */
    public LocalExecutor(DefaultContext defaultContext);
}

Usage Example:

// Create local executor
DefaultContext context = LocalContextUtils.buildDefaultContext(options);
Executor executor = new LocalExecutor(context);

// Start and use executor
executor.start();
String sessionId = executor.openSession(null);
try {
    // Perform SQL operations
    Operation op = executor.parseStatement(sessionId, "SHOW TABLES");
    TableResult result = executor.executeOperation(sessionId, op);
} finally {
    executor.closeSession(sessionId);
}

Error Handling

SqlExecutionException

Primary exception type for SQL execution errors.

public class SqlExecutionException extends Exception {
    public SqlExecutionException(String message);
    public SqlExecutionException(String message, Throwable cause);
}

Common error scenarios:

  • SQL parsing errors with syntax details
  • Execution errors with Flink cluster communication issues
  • Session management errors (duplicate session ID, session not found)
  • Resource errors (insufficient memory, network connectivity)
  • Configuration errors (invalid property values)

Example Error Handling:

try {
    Operation operation = executor.parseStatement(sessionId, malformedSQL);
} catch (SqlExecutionException e) {
    System.err.println("SQL Error: " + e.getMessage());
    // Handle parsing or execution error
    if (e.getCause() instanceof ValidationException) {
        // Handle validation-specific error
    }
}

Integration Patterns

Session Lifecycle Management

Typical session usage pattern:

Executor executor = new LocalExecutor(defaultContext);
executor.start();

String sessionId = executor.openSession("my-session");
try {
    // Configure session
    executor.setSessionProperty(sessionId, "key", "value");
    
    // Execute operations
    Operation op = executor.parseStatement(sessionId, sql);
    TableResult result = executor.executeOperation(sessionId, op);
    
} finally {
    // Always close session
    executor.closeSession(sessionId);
}

Streaming Query Pattern

Pattern for handling streaming query results:

QueryOperation query = (QueryOperation) executor.parseStatement(sessionId, streamingSQL);
ResultDescriptor descriptor = executor.executeQuery(sessionId, query);

// Poll for streaming results
while (!cancelled) {
    TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, descriptor.getResultId());
    
    switch (result.getType()) {
        case PAYLOAD:
            processRows(result.getPayload());
            break;
        case EMPTY:
            Thread.sleep(100); // Wait before next poll
            break;
        case EOS:
            System.out.println("Stream ended");
            return;
    }
}

The Executor interface provides a comprehensive abstraction over Flink's SQL execution capabilities, enabling both simple statement execution and complex streaming data processing workflows.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-client-2-12

docs

command-line-interface.md

configuration-options.md

index.md

result-handling-display.md

session-context-management.md

sql-client-application.md

sql-execution-gateway.md

tile.json