CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-gateway

A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.

Pending
Overview
Eval results
Files

result-data-models.mddocs/

Result Data Models

Result data models provide rich result types with metadata, pagination support, and schema information for handling query results and operation outcomes. These models support efficient data transfer and comprehensive result metadata.

Capabilities

ResultSet

Collection of query results with metadata, pagination tokens, and schema information.

/**
 * Collection of query results with metadata and pagination support
 */
public interface ResultSet {
    /**
     * Get the type of result (NOT_READY, PAYLOAD, EOS)
     * @return ResultType indicating the nature of this result
     */
    ResultType getResultType();
    
    /**
     * Get token for fetching next batch of results
     * @return Long token for pagination, null if no more results
     */
    Long getNextToken();
    
    /**
     * Get schema of the result data
     * @return ResolvedSchema describing the structure of results
     */
    ResolvedSchema getResultSchema();
    
    /**
     * Get actual row data
     * @return List of RowData containing the query results
     */
    List<RowData> getData();
    
    /**
     * Check if this is a query result (vs. status/metadata result)
     * @return true if contains query data
     */
    boolean isQueryResult();
    
    /**
     * Get associated Flink job ID if available
     * @return Optional JobID for the operation
     */
    Optional<JobID> getJobID();
    
    /**
     * Get result kind indicating the type of SQL operation
     * @return ResultKind (SUCCESS, SUCCESS_WITH_CONTENT, etc.)
     */
    ResultKind getResultKind();
}

/**
 * Result type enumeration
 */
public enum ResultType {
    /** Data is not ready yet, client should retry */
    NOT_READY,
    
    /** Contains actual data payload */
    PAYLOAD,
    
    /** End of stream, no more data available */
    EOS
}

ResultSetImpl

Default implementation of ResultSet with builder support.

/**
 * Default implementation of ResultSet
 */
public class ResultSetImpl implements ResultSet {
    /**
     * Create builder for constructing ResultSet
     * @return Builder instance
     */
    public static Builder builder();
    
    /**
     * Builder for constructing ResultSet instances
     */
    public static class Builder {
        /**
         * Set result type
         * @param resultType Type of result
         * @return Builder for chaining
         */
        public Builder resultType(ResultType resultType);
        
        /**
         * Set next token for pagination
         * @param nextToken Token for next batch
         * @return Builder for chaining
         */
        public Builder nextToken(Long nextToken);
        
        /**
         * Set result schema
         * @param schema Schema describing the data structure
         * @return Builder for chaining
         */
        public Builder resultSchema(ResolvedSchema schema);
        
        /**
         * Set result data
         * @param data List of row data
         * @return Builder for chaining
         */
        public Builder data(List<RowData> data);
        
        /**
         * Set job ID
         * @param jobID Associated job ID
         * @return Builder for chaining
         */
        public Builder jobID(JobID jobID);
        
        /**
         * Set result kind
         * @param resultKind Kind of result
         * @return Builder for chaining
         */
        public Builder resultKind(ResultKind resultKind);
        
        /**
         * Build the ResultSet instance
         * @return Constructed ResultSet
         */
        public ResultSet build();
    }
}

TableInfo

Basic information about tables and views in the catalog.

/**
 * Basic information about tables/views
 */
public class TableInfo {
    /**
     * Get table identifier (catalog.database.table)
     * @return ObjectIdentifier for the table
     */
    public ObjectIdentifier getIdentifier();
    
    /**
     * Get table kind (TABLE, VIEW, MATERIALIZED_TABLE, etc.)
     * @return TableKind indicating the type of table
     */
    public TableKind getTableKind();
    
    /**
     * Create TableInfo instance
     * @param identifier Table identifier
     * @param tableKind Kind of table
     * @return TableInfo instance
     */
    public static TableInfo of(ObjectIdentifier identifier, TableKind tableKind);
    
    @Override
    public boolean equals(Object o);
    
    @Override
    public int hashCode();
    
    @Override
    public String toString();
}

FunctionInfo

Information about functions without loading their implementation.

/**
 * Information about functions without implementation loading
 */
public class FunctionInfo {
    /**
     * Get function identifier
     * @return UnresolvedIdentifier for the function
     */
    public UnresolvedIdentifier getIdentifier();
    
    /**
     * Get function kind if available
     * @return Optional FunctionKind (SCALAR, TABLE, AGGREGATE, etc.)
     */
    public Optional<FunctionKind> getKind();
    
    /**
     * Create FunctionInfo with identifier only
     * @param identifier Function identifier
     * @return FunctionInfo instance
     */
    public static FunctionInfo of(UnresolvedIdentifier identifier);
    
    /**
     * Create FunctionInfo with identifier and kind
     * @param identifier Function identifier
     * @param kind Function kind
     * @return FunctionInfo instance
     */
    public static FunctionInfo of(UnresolvedIdentifier identifier, FunctionKind kind);
    
    @Override
    public boolean equals(Object o);
    
    @Override
    public int hashCode();
    
    @Override
    public String toString();
}

OperationInfo

Status and error information for operations.

/**
 * Status and error information for operations
 */
public class OperationInfo {
    /**
     * Get current operation status
     * @return OperationStatus representing current state
     */
    public OperationStatus getStatus();
    
    /**
     * Get exception information if operation failed
     * @return Optional exception message and stack trace
     */
    public Optional<String> getException();
    
    /**
     * Create OperationInfo with status only
     * @param status Current operation status
     * @return OperationInfo instance
     */
    public static OperationInfo of(OperationStatus status);
    
    /**
     * Create OperationInfo with status and exception
     * @param status Current operation status
     * @param exception Exception details
     * @return OperationInfo instance
     */
    public static OperationInfo of(OperationStatus status, String exception);
    
    @Override
    public boolean equals(Object o);
    
    @Override
    public int hashCode();
    
    @Override
    public String toString();
}

GatewayInfo

Information about the SQL Gateway service.

/**
 * Information about the SQL Gateway service
 */
public class GatewayInfo {
    /**
     * Get product name (always "Apache Flink")
     * @return Product name string
     */
    public String getProductName();
    
    /**
     * Get current Flink version
     * @return Version string
     */
    public String getVersion();
    
    /**
     * Create GatewayInfo instance
     * @param productName Product name
     * @param version Version string
     * @return GatewayInfo instance
     */
    public static GatewayInfo of(String productName, String version);
    
    @Override
    public boolean equals(Object o);
    
    @Override
    public int hashCode();
    
    @Override
    public String toString();
}

FetchOrientation

Enumeration for result fetching direction.

/**
 * Enumeration for result fetching direction
 */
public enum FetchOrientation {
    /** Fetch next results in forward direction */
    FETCH_NEXT,
    
    /** Fetch prior results in backward direction */
    FETCH_PRIOR
}

NotReadyResult

Special result indicating data is not ready yet.

/**
 * Special result indicating data not ready yet
 */
public class NotReadyResult implements ResultSet {
    /**
     * Create not ready result
     * @return NotReadyResult instance
     */
    public static NotReadyResult INSTANCE;
    
    @Override
    public ResultType getResultType();
    // Always returns ResultType.NOT_READY
    
    @Override
    public Long getNextToken();
    // Returns null
    
    @Override
    public ResolvedSchema getResultSchema();
    // Returns null
    
    @Override
    public List<RowData> getData();
    // Returns empty list
    
    @Override
    public boolean isQueryResult();
    // Returns false
    
    @Override
    public Optional<JobID> getJobID();
    // Returns empty optional
    
    @Override
    public ResultKind getResultKind();
    // Returns null
}

Usage Examples

Processing Query Results

import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.ResultType;

// Execute query and process results
OperationHandle operation = service.executeStatement(
    sessionHandle,
    "SELECT id, name, salary FROM employees WHERE department = 'Engineering'",
    30000L,
    new Configuration()
);

// Wait for completion and fetch results
long token = 0L;
int batchSize = 100;

while (true) {
    ResultSet resultSet = service.fetchResults(sessionHandle, operation, token, batchSize);
    
    switch (resultSet.getResultType()) {
        case NOT_READY:
            // Data not ready, wait and retry
            Thread.sleep(1000);
            continue;
            
        case PAYLOAD:
            // Process data
            processResultData(resultSet);
            
            // Check for more data
            Long nextToken = resultSet.getNextToken();
            if (nextToken != null) {
                token = nextToken;
                continue; // Fetch next batch
            } else {
                // No more data
                break;
            }
            
        case EOS:
            // End of stream
            System.out.println("All results processed");
            break;
    }
    break;
}

private void processResultData(ResultSet resultSet) {
    ResolvedSchema schema = resultSet.getResultSchema();
    List<RowData> data = resultSet.getData();
    
    System.out.println("Schema: " + schema);
    System.out.println("Batch size: " + data.size());
    
    for (RowData row : data) {
        // Process each row
        System.out.println("Row: " + row);
    }
}

Building Custom Results

import org.apache.flink.table.gateway.api.results.ResultSetImpl;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;

// Create custom result set
List<RowData> customData = createCustomData();
ResolvedSchema customSchema = ResolvedSchema.of(
    Column.physical("id", new IntType()),
    Column.physical("name", new VarCharType(255)),
    Column.physical("status", new VarCharType(50))
);

ResultSet customResult = ResultSetImpl.builder()
    .resultType(ResultType.PAYLOAD)
    .resultSchema(customSchema)
    .data(customData)
    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
    .nextToken(null) // No more data
    .build();

// Use in custom operation
Callable<ResultSet> customOperation = () -> customResult;
OperationHandle operation = service.submitOperation(sessionHandle, customOperation);

Catalog Information Processing

import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.results.FunctionInfo;

// List tables and process information
Set<TableInfo> tables = service.listTables(
    sessionHandle,
    "my_catalog",
    "my_database",
    Set.of(TableKind.TABLE, TableKind.VIEW)
);

System.out.println("Found " + tables.size() + " tables/views:");
for (TableInfo table : tables) {
    System.out.println("- " + table.getIdentifier() + " (" + table.getTableKind() + ")");
    
    // Get detailed table information
    if (table.getTableKind() == TableKind.TABLE) {
        ResolvedCatalogBaseTable<?> tableDetails = service.getTable(sessionHandle, table.getIdentifier());
        System.out.println("  Schema: " + tableDetails.getResolvedSchema());
        System.out.println("  Options: " + tableDetails.getOptions());
    }
}

// List functions
Set<FunctionInfo> functions = service.listUserDefinedFunctions(sessionHandle, "my_catalog", "my_database");
System.out.println("\nFound " + functions.size() + " user-defined functions:");
for (FunctionInfo function : functions) {
    System.out.println("- " + function.getIdentifier() + 
        (function.getKind().isPresent() ? " (" + function.getKind().get() + ")" : ""));
}

Result Set Pagination

// Efficient pagination through large result sets
public class ResultSetPaginator {
    private final SqlGatewayService service;
    private final SessionHandle sessionHandle;
    private final OperationHandle operationHandle;
    private final int pageSize;
    
    public ResultSetPaginator(SqlGatewayService service, SessionHandle sessionHandle, 
                            OperationHandle operationHandle, int pageSize) {
        this.service = service;
        this.sessionHandle = sessionHandle;
        this.operationHandle = operationHandle;
        this.pageSize = pageSize;
    }
    
    public Iterator<List<RowData>> iterator() {
        return new Iterator<List<RowData>>() {
            private long currentToken = 0L;
            private boolean hasNext = true;
            private List<RowData> nextBatch = null;
            
            @Override
            public boolean hasNext() {
                if (nextBatch == null && hasNext) {
                    fetchNextBatch();
                }
                return nextBatch != null && !nextBatch.isEmpty();
            }
            
            @Override
            public List<RowData> next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                List<RowData> result = nextBatch;
                nextBatch = null;
                return result;
            }
            
            private void fetchNextBatch() {
                try {
                    ResultSet resultSet = service.fetchResults(sessionHandle, operationHandle, currentToken, pageSize);
                    
                    if (resultSet.getResultType() == ResultType.PAYLOAD) {
                        nextBatch = resultSet.getData();
                        Long nextToken = resultSet.getNextToken();
                        if (nextToken != null) {
                            currentToken = nextToken;
                        } else {
                            hasNext = false;
                        }
                    } else if (resultSet.getResultType() == ResultType.EOS) {
                        nextBatch = Collections.emptyList();
                        hasNext = false;
                    } else {
                        // NOT_READY - could implement retry logic here
                        nextBatch = Collections.emptyList();
                        hasNext = false;
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Failed to fetch next batch", e);
                }
            }
        };
    }
}

// Usage
ResultSetPaginator paginator = new ResultSetPaginator(service, sessionHandle, operation, 1000);
for (List<RowData> batch : paginator) {
    System.out.println("Processing batch of " + batch.size() + " rows");
    // Process batch...
}

Error Information Handling

// Comprehensive error handling with operation info
public class OperationErrorHandler {
    
    public void handleOperationResult(SessionHandle session, OperationHandle operation) {
        OperationInfo info = service.getOperationInfo(session, operation);
        
        switch (info.getStatus()) {
            case FINISHED:
                System.out.println("Operation completed successfully");
                processResults(session, operation);
                break;
                
            case ERROR:
                String error = info.getException().orElse("Unknown error occurred");
                System.err.println("Operation failed: " + error);
                
                // Parse error for specific handling
                if (error.contains("TableNotExistException")) {
                    handleTableNotFound(error);
                } else if (error.contains("ValidationException")) {
                    handleValidationError(error);
                } else if (error.contains("OutOfMemoryError")) {
                    handleMemoryError(error);
                } else {
                    handleGenericError(error);
                }
                break;
                
            case TIMEOUT:
                System.err.println("Operation timed out");
                // Could implement retry with longer timeout
                break;
                
            case CANCELED:
                System.out.println("Operation was canceled");
                break;
                
            default:
                System.out.println("Operation in unexpected status: " + info.getStatus());
        }
    }
    
    private void handleTableNotFound(String error) {
        System.err.println("Table not found - check catalog and database names");
    }
    
    private void handleValidationError(String error) {
        System.err.println("SQL validation failed - check syntax and schema");
    }
    
    private void handleMemoryError(String error) {
        System.err.println("Memory error - consider reducing batch size or increasing resources");
    }
    
    private void handleGenericError(String error) {
        System.err.println("Generic error occurred: " + error);
    }
    
    private void processResults(SessionHandle session, OperationHandle operation) {
        // Process successful results...
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway

docs

configuration-options.md

core-service-interface.md

endpoint-framework.md

index.md

operation-management.md

rest-implementation.md

result-data-models.md

session-management.md

workflow-management.md

tile.json