CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-uber

Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.

Pending
Overview
Eval results
Files

sql-gateway.mddocs/

SQL Gateway API

Remote SQL execution capabilities for building client-server architectures and multi-tenant SQL services with Apache Flink's Table API.

Capabilities

SqlGatewayService

Core service interface for managing SQL sessions and executing SQL operations remotely.

/**
 * Open a new session for SQL execution
 * @param environment Session configuration and initialization parameters
 * @return SessionHandle for identifying the session
 * @throws SqlGatewayException if session creation fails
 */
public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;

/**
 * Close an existing session and clean up resources
 * @param sessionHandle Handle of the session to close
 * @throws SqlGatewayException if session closure fails
 */
public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;

/**
 * Configure session properties using SET statements
 * @param sessionHandle Handle of the session to configure
 * @param statement Configuration statement (e.g., "SET table.planner = blink")
 * @param executionTimeoutMs Maximum execution time in milliseconds
 * @throws SqlGatewayException if configuration fails
 */
public void configureSession(SessionHandle sessionHandle, 
                           String statement, 
                           long executionTimeoutMs) throws SqlGatewayException;

/**
 * Execute a SQL statement (DDL, DML, or query)
 * @param sessionHandle Handle of the session
 * @param statement SQL statement to execute
 * @param executionTimeoutMs Maximum execution time in milliseconds
 * @param executionConfig Additional execution configuration
 * @return OperationHandle for tracking the operation
 * @throws SqlGatewayException if execution fails
 */
public OperationHandle executeStatement(SessionHandle sessionHandle,
                                      String statement,
                                      long executionTimeoutMs,
                                      Configuration executionConfig) throws SqlGatewayException;

/**
 * Submit a custom operation for execution
 * @param sessionHandle Handle of the session
 * @param executor Callable that produces the operation result
 * @return OperationHandle for tracking the operation
 * @throws SqlGatewayException if submission fails
 */
public OperationHandle submitOperation(SessionHandle sessionHandle,
                                     Callable<ResultSet> executor) throws SqlGatewayException;

/**
 * Cancel a running operation
 * @param sessionHandle Handle of the session
 * @param operationHandle Handle of the operation to cancel
 * @throws SqlGatewayException if cancellation fails
 */
public void cancelOperation(SessionHandle sessionHandle,
                          OperationHandle operationHandle) throws SqlGatewayException;

/**
 * Close an operation and clean up resources
 * @param sessionHandle Handle of the session
 * @param operationHandle Handle of the operation to close
 * @throws SqlGatewayException if operation closure fails
 */
public void closeOperation(SessionHandle sessionHandle,
                         OperationHandle operationHandle) throws SqlGatewayException;

/**
 * Get information about an operation
 * @param sessionHandle Handle of the session
 * @param operationHandle Handle of the operation
 * @return OperationInfo containing status and metadata
 * @throws SqlGatewayException if operation info retrieval fails
 */
public OperationInfo getOperationInfo(SessionHandle sessionHandle,
                                    OperationHandle operationHandle) throws SqlGatewayException;

/**
 * Get schema information for an operation result
 * @param sessionHandle Handle of the session
 * @param operationHandle Handle of the operation
 * @return ResolvedSchema of the operation result
 * @throws SqlGatewayException if schema retrieval fails
 */
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle,
                                             OperationHandle operationHandle) throws SqlGatewayException;

/**
 * Fetch results from an operation
 * @param sessionHandle Handle of the session
 * @param operationHandle Handle of the operation
 * @param orientation Direction for fetching results
 * @param maxRows Maximum number of rows to fetch
 * @return ResultSet containing the fetched results
 * @throws SqlGatewayException if result fetching fails
 */
public ResultSet fetchResults(SessionHandle sessionHandle,
                            OperationHandle operationHandle,
                            FetchOrientation orientation,
                            int maxRows) throws SqlGatewayException;

Basic SQL Gateway Usage:

// Create SQL Gateway service instance
SqlGatewayService gateway = SqlGatewayServiceImpl.create(gatewayConfig);

// Open session with configuration
Map<String, String> sessionProperties = new HashMap<>();
sessionProperties.put("execution.parallelism", "4");
sessionProperties.put("table.planner", "blink");

SessionEnvironment sessionEnv = SessionEnvironment.newBuilder()
    .setSessionEndpointConfig(endpointConfig)
    .addSessionConfig(sessionProperties)
    .build();

SessionHandle session = gateway.openSession(sessionEnv);

try {
    // Configure session
    gateway.configureSession(session, "SET execution.checkpointing.interval = 10s", 5000);
    
    // Execute DDL
    OperationHandle createTable = gateway.executeStatement(session,
        "CREATE TABLE orders (" +
        "  id BIGINT," +
        "  customer_id BIGINT," +
        "  amount DECIMAL(10,2)" +
        ") WITH ('connector' = 'kafka', ...)",
        30000, new Configuration());
    
    // Execute query
    OperationHandle query = gateway.executeStatement(session,
        "SELECT customer_id, SUM(amount) as total " +
        "FROM orders GROUP BY customer_id",
        60000, new Configuration());
    
    // Fetch results
    ResultSet results = gateway.fetchResults(session, query, 
        FetchOrientation.FETCH_NEXT, 100);
    
    while (results.hasNext()) {
        RowData row = results.next();
        System.out.println("Customer: " + row.getLong(0) + ", Total: " + row.getDecimal(1, 10, 2));
    }
    
} finally {
    gateway.closeSession(session);
}

Session Management

Handle SQL Gateway sessions with proper lifecycle management.

/**
 * Session handle for identifying and tracking SQL sessions
 */
public final class SessionHandle {
    /**
     * Get unique session identifier
     * @return UUID representing the session
     */
    public UUID getIdentifier();
    
    /**
     * Get session creation timestamp
     * @return Session creation time
     */
    public Instant getCreationTime();
}

/**
 * Session environment configuration
 */
public final class SessionEnvironment {
    /**
     * Create new session environment builder
     * @return Builder for constructing session environment
     */
    public static Builder newBuilder();
    
    public static final class Builder {
        /**
         * Set session endpoint configuration
         * @param config Endpoint-specific configuration
         * @return Builder for method chaining
         */
        public Builder setSessionEndpointConfig(Map<String, String> config);
        
        /**
         * Add session configuration properties
         * @param config Map of configuration key-value pairs
         * @return Builder for method chaining
         */
        public Builder addSessionConfig(Map<String, String> config);
        
        /**
         * Set default catalog name
         * @param catalogName Name of the default catalog
         * @return Builder for method chaining
         */
        public Builder setDefaultCatalog(String catalogName);
        
        /**
         * Set default database name
         * @param databaseName Name of the default database
         * @return Builder for method chaining
         */
        public Builder setDefaultDatabase(String databaseName);
        
        /**
         * Build the session environment
         * @return Constructed SessionEnvironment
         */
        public SessionEnvironment build();
    }
}

Operation Management

Track and manage long-running SQL operations.

/**
 * Operation handle for identifying and tracking SQL operations
 */
public final class OperationHandle {
    /**
     * Get unique operation identifier
     * @return UUID representing the operation
     */
    public UUID getIdentifier();
    
    /**
     * Get operation type
     * @return Type of the operation (EXECUTE_STATEMENT, etc.)
     */
    public OperationType getOperationType();
}

/**
 * Operation information and status
 */
public final class OperationInfo {
    /**
     * Get operation status
     * @return Current status of the operation
     */
    public OperationStatus getStatus();
    
    /**
     * Get operation exception if failed
     * @return Optional exception that caused operation failure
     */
    public Optional<Throwable> getException();
    
    /**
     * Check if operation has results available
     * @return true if operation has results to fetch
     */
    public boolean hasResults();
    
    /**
     * Get operation creation timestamp
     * @return When the operation was created
     */
    public Instant getCreateTime();
    
    /**
     * Get operation end timestamp
     * @return When the operation completed (if finished)
     */
    public Optional<Instant> getEndTime();
}

/**
 * Operation status enumeration
 */
public enum OperationStatus {
    INITIALIZED,
    PENDING,
    RUNNING,
    FINISHED,
    CANCELED,
    FAILED,
    TIMEOUT
}

/**
 * Operation type enumeration
 */
public enum OperationType {
    EXECUTE_STATEMENT,
    SUBMIT_PLAN
}

Result Management

Handle result sets and data fetching from SQL operations.

/**
 * Result set for accessing query results
 */
public interface ResultSet extends AutoCloseable {
    /**
     * Get the result type
     * @return Type of results (PAYLOAD, NOT_READY, EOS)
     */
    public ResultType getResultType();
    
    /**
     * Get next token for pagination
     * @return Optional token for fetching next page of results
     */
    public Optional<String> getNextToken();
    
    /**
     * Get result data as list of RowData
     * @return List containing the fetched rows
     */
    public List<RowData> getData();
    
    /**
     * Check if more results are available
     * @return true if hasNext() would return more data
     */
    public boolean hasNext();
    
    /**
     * Get next row of data
     * @return Next RowData instance
     */
    public RowData next();
    
    /**
     * Get result schema
     * @return Schema of the result rows
     */
    public ResolvedSchema getResultSchema();
}

/**
 * Result type enumeration
 */
public enum ResultType {
    PAYLOAD,    // Contains actual data
    NOT_READY,  // Operation not ready yet
    EOS         // End of stream
}

/**
 * Fetch orientation for result pagination
 */
public enum FetchOrientation {
    FETCH_NEXT,   // Fetch next rows
    FETCH_PRIOR,  // Fetch previous rows
    FETCH_FIRST,  // Fetch from beginning
    FETCH_LAST    // Fetch from end
}

Result Handling Examples:

// Execute query and handle results
OperationHandle queryOp = gateway.executeStatement(session,
    "SELECT * FROM large_table ORDER BY id", 
    120000, new Configuration());

// Wait for operation to complete
OperationInfo opInfo;
do {
    Thread.sleep(1000);
    opInfo = gateway.getOperationInfo(session, queryOp);
} while (opInfo.getStatus() == OperationStatus.RUNNING);

if (opInfo.getStatus() == OperationStatus.FINISHED) {
    // Fetch results in batches
    String nextToken = null;
    int batchSize = 1000;
    
    do {
        ResultSet batch = gateway.fetchResults(session, queryOp, 
            FetchOrientation.FETCH_NEXT, batchSize);
            
        if (batch.getResultType() == ResultType.PAYLOAD) {
            List<RowData> rows = batch.getData();
            for (RowData row : rows) {
                processRow(row);
            }
            nextToken = batch.getNextToken().orElse(null);
        }
    } while (nextToken != null);
}

Endpoint Management

Configure and manage SQL Gateway endpoints for different protocols.

/**
 * SQL Gateway endpoint interface
 */
public interface SqlGatewayEndpoint {
    /**
     * Start the endpoint and begin accepting connections
     * @throws SqlGatewayException if startup fails
     */
    public void start() throws SqlGatewayException;
    
    /**
     * Stop the endpoint and close all connections
     * @throws SqlGatewayException if shutdown fails
     */
    public void stop() throws SqlGatewayException;
    
    /**
     * Get endpoint information
     * @return Information about this endpoint
     */
    public EndpointInfo getInfo();
}

/**
 * Endpoint information
 */
public final class EndpointInfo {
    /**
     * Get endpoint identifier
     * @return Unique endpoint ID
     */
    public String getEndpointId();
    
    /**
     * Get endpoint version
     * @return Version string of the endpoint
     */
    public String getEndpointVersion();
    
    /**
     * Get supported protocols
     * @return Set of supported protocol names
     */
    public Set<String> getSupportedProtocols();
}

Gateway Configuration

Configure SQL Gateway service and endpoints.

/**
 * SQL Gateway configuration builder
 */
public class SqlGatewayConfig {
    public static Builder newBuilder() {
        return new Builder();
    }
    
    public static class Builder {
        /**
         * Set session timeout
         * @param timeout Session timeout duration
         * @return Builder for method chaining
         */
        public Builder setSessionTimeout(Duration timeout);
        
        /**
         * Set maximum number of concurrent sessions
         * @param maxSessions Maximum session count
         * @return Builder for method chaining
         */
        public Builder setMaxSessions(int maxSessions);
        
        /**
         * Set operation timeout
         * @param timeout Default operation timeout
         * @return Builder for method chaining
         */
        public Builder setOperationTimeout(Duration timeout);
        
        /**
         * Set result fetch timeout
         * @param timeout Result fetch timeout
         * @return Builder for method chaining
         */
        public Builder setResultFetchTimeout(Duration timeout);
        
        /**
         * Enable/disable session persistence
         * @param persistent Whether to persist sessions
         * @return Builder for method chaining
         */
        public Builder setPersistentSessions(boolean persistent);
        
        /**
         * Build the configuration
         * @return Constructed configuration
         */
        public SqlGatewayConfig build();
    }
}

Error Handling

Handle SQL Gateway specific exceptions and error conditions.

/**
 * Base exception for SQL Gateway operations
 */
public class SqlGatewayException extends Exception {
    /**
     * Get error code
     * @return Specific error code for the exception
     */
    public String getErrorCode();
    
    /**
     * Get error details
     * @return Additional error details and context
     */
    public Map<String, String> getErrorDetails();
}

/**
 * Exception for session-related errors
 */
public class SessionException extends SqlGatewayException {
    /**
     * Get session handle that caused the error
     * @return Session handle associated with error
     */
    public SessionHandle getSessionHandle();
}

/**
 * Exception for operation-related errors
 */
public class OperationException extends SqlGatewayException {
    /**
     * Get operation handle that caused the error
     * @return Operation handle associated with error
     */
    public OperationHandle getOperationHandle();
}

Advanced Usage Patterns

Common patterns for building applications with SQL Gateway.

// Connection pooling for multiple clients
public class SqlGatewayConnectionPool {
    private final SqlGatewayService gateway;
    private final Queue<SessionHandle> availableSessions;
    private final int maxSessions;
    
    public SessionHandle acquireSession() throws SqlGatewayException {
        SessionHandle session = availableSessions.poll();
        if (session == null) {
            if (activeSessions.size() < maxSessions) {
                session = gateway.openSession(defaultSessionEnv);
            } else {
                throw new SqlGatewayException("No available sessions");
            }
        }
        return session;
    }
    
    public void releaseSession(SessionHandle session) {
        availableSessions.offer(session);
    }
}

// Async query execution
public class AsyncQueryExecutor {
    public CompletableFuture<List<RowData>> executeQueryAsync(
            SessionHandle session, String query) {
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                OperationHandle op = gateway.executeStatement(session, query, 60000, new Configuration());
                
                // Poll for completion
                OperationInfo info;
                do {
                    Thread.sleep(100);
                    info = gateway.getOperationInfo(session, op);
                } while (info.getStatus() == OperationStatus.RUNNING);
                
                if (info.getStatus() == OperationStatus.FINISHED) {
                    ResultSet results = gateway.fetchResults(session, op, 
                        FetchOrientation.FETCH_NEXT, Integer.MAX_VALUE);
                    return results.getData();
                } else {
                    throw new RuntimeException("Query failed: " + info.getException());
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}

// Multi-tenant session management
public class MultiTenantGateway {
    private final Map<String, SessionHandle> tenantSessions = new ConcurrentHashMap<>();
    
    public SessionHandle getOrCreateTenantSession(String tenantId) throws SqlGatewayException {
        return tenantSessions.computeIfAbsent(tenantId, id -> {
            try {
                SessionEnvironment env = SessionEnvironment.newBuilder()
                    .setDefaultCatalog("tenant_" + id)
                    .addSessionConfig(Map.of(
                        "execution.parallelism", "2",
                        "table.exec.resource.default-parallelism", "2"
                    ))
                    .build();
                return gateway.openSession(env);
            } catch (SqlGatewayException e) {
                throw new RuntimeException(e);
            }
        });
    }
    
    public void executeForTenant(String tenantId, String sql) throws SqlGatewayException {
        SessionHandle session = getOrCreateTenantSession(tenantId);
        gateway.executeStatement(session, sql, 30000, new Configuration());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber

docs

connectors.md

data-types.md

datastream-bridge.md

expressions.md

functions.md

index.md

sql-gateway.md

table-operations.md

tile.json