CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-client-2-12

SQL Client for exploring and submitting SQL programs to Flink

Pending
Overview
Eval results
Files

result-handling-display.mddocs/

Result Handling and Display

Type-safe result containers with support for materialized and streaming results, plus multiple display formats. The result handling system provides comprehensive support for both batch and streaming query results with efficient pagination and real-time change streaming.

Capabilities

ResultDescriptor Class

Metadata container for query results providing comprehensive result information and access patterns.

public class ResultDescriptor {
    /**
     * Get unique result identifier for tracking and access
     * @return String identifier for this result set
     */
    public String getResultId();
    
    /**
     * Get table schema information for result structure
     * @return ResolvedSchema with column names, types, and metadata
     */
    public ResolvedSchema getResultSchema();
    
    /**
     * Check if result is materialized in memory for pagination
     * @return true if result supports random page access
     */
    public boolean isMaterialized();
    
    /**
     * Check if result should use tableau display format
     * @return true for direct tableau output, false for interactive display
     */
    public boolean isTableauMode();
    
    /**
     * Check if result comes from streaming query execution
     * @return true for streaming queries, false for batch queries
     */
    public boolean isStreamingMode();
    
    /**
     * Get maximum column width for display formatting
     * @return Maximum character width for column display
     */
    public int maxColumnWidth();
}

Usage Example:

// Execute query and get result descriptor
QueryOperation query = (QueryOperation) executor.parseOperation(sessionId, "SELECT * FROM orders");
ResultDescriptor descriptor = executor.executeQuery(sessionId, query);

// Check result characteristics
System.out.println("Result ID: " + descriptor.getResultId());
System.out.println("Is materialized: " + descriptor.isMaterialized());
System.out.println("Is streaming: " + descriptor.isStreamingMode());
System.out.println("Use tableau mode: " + descriptor.isTableauMode());

// Access schema information
ResolvedSchema schema = descriptor.getResultSchema();
schema.getColumns().forEach(column -> 
    System.out.println("Column: " + column.getName() + " Type: " + column.getDataType())
);

TypedResult Class

Generic result container with type information providing type-safe access to result data and metadata.

public class TypedResult<P> {
    /**
     * Get result type indicating data availability
     * @return ResultType enum value
     */
    public ResultType getType();
    
    /**
     * Get actual result payload data
     * @return Payload data of type P, or null for non-PAYLOAD results
     */
    public P getPayload();
    
    /**
     * Set result type
     * @param type ResultType to set
     */
    public void setType(ResultType type);
    
    /**
     * Set result payload
     * @param payload Payload data to set
     */
    public void setPayload(P payload);
}

TypedResult Factory Methods

Static factory methods for creating typed results with appropriate type indicators.

/**
 * Create empty result indicating no data available
 * @return TypedResult with EMPTY type
 */
public static <T> TypedResult<T> empty();

/**
 * Create result with actual data payload
 * @param payload Data to include in result
 * @return TypedResult with PAYLOAD type and data
 */
public static <T> TypedResult<T> payload(T payload);

/**
 * Create end-of-stream marker result
 * @return TypedResult with EOS type indicating stream completion
 */
public static <T> TypedResult<T> endOfStream();

Usage Example:

// Handle different result types
TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, resultId);

switch (result.getType()) {
    case PAYLOAD:
        List<Row> rows = result.getPayload();
        System.out.println("Received " + rows.size() + " rows");
        rows.forEach(this::processRow);
        break;
        
    case EMPTY:
        System.out.println("No new data available");
        // Continue polling or wait
        break;
        
    case EOS:
        System.out.println("Stream has ended");
        // Cleanup and exit polling loop
        break;
}

ResultType Enumeration

Enumeration defining the type of result data availability.

public enum ResultType {
    /** Result contains actual data payload */
    PAYLOAD,
    
    /** Result is empty - no data currently available */
    EMPTY,
    
    /** End of stream - no more data will be available */
    EOS
}

Result Interface Hierarchy

DynamicResult Interface

Base interface for dynamic result handling providing common result operations.

public interface DynamicResult extends AutoCloseable {
    /**
     * Check if result is materialized in memory
     * @return true if result supports random access, false for streaming-only
     */
    boolean isMaterialized();
    
    /**
     * Close result and release associated resources
     * @throws IOException if cleanup fails
     */
    void close() throws IOException;
}

MaterializedResult Interface

Interface for materialized results supporting pagination and random access.

public interface MaterializedResult extends DynamicResult {
    /**
     * Create materialized snapshot with specified page size
     * @param pageSize Number of rows per page
     * @return TypedResult containing total page count
     */
    TypedResult<Integer> snapshot(int pageSize);
    
    /**
     * Retrieve specific page of results
     * @param page Page number (0-based indexing)
     * @return List of rows for the requested page
     */
    List<Row> retrievePage(int page);
}

Usage Example:

// Handle materialized results with pagination
if (result instanceof MaterializedResult) {
    MaterializedResult materializedResult = (MaterializedResult) result;
    
    // Create snapshot with 100 rows per page
    TypedResult<Integer> pageCountResult = materializedResult.snapshot(100);
    if (pageCountResult.getType() == ResultType.PAYLOAD) {
        int totalPages = pageCountResult.getPayload();
        System.out.println("Total pages: " + totalPages);
        
        // Retrieve each page
        for (int page = 0; page < totalPages; page++) {
            List<Row> pageRows = materializedResult.retrievePage(page);
            System.out.println("Page " + page + ": " + pageRows.size() + " rows");
            pageRows.forEach(this::processRow);
        }
    }
}

ChangelogResult Interface

Interface for streaming results providing incremental change access.

public interface ChangelogResult extends DynamicResult {
    /**
     * Retrieve incremental changes from streaming result
     * @return TypedResult containing list of changed rows or status
     */
    TypedResult<List<Row>> retrieveChanges();
}

Usage Example:

// Handle streaming results with changelog
if (result instanceof ChangelogResult) {
    ChangelogResult changelogResult = (ChangelogResult) result;
    
    // Poll for changes
    while (true) {
        TypedResult<List<Row>> changes = changelogResult.retrieveChanges();
        
        switch (changes.getType()) {
            case PAYLOAD:
                List<Row> changedRows = changes.getPayload();
                changedRows.forEach(row -> {
                    // Process row changes - each row includes change type (+I, -D, -U, +U)
                    RowKind kind = row.getKind();
                    System.out.println("Change type: " + kind + ", Data: " + row);
                });
                break;
                
            case EMPTY:
                Thread.sleep(100); // Wait before next poll
                break;
                
            case EOS:
                System.out.println("Stream completed");
                return; // Exit polling loop
        }
    }
}

Result Display Components

CliTableauResultView Class

Tableau-style result display providing direct formatted output without interactive features.

public class CliTableauResultView implements AutoCloseable {
    /**
     * Create tableau result view
     * @param terminal Terminal for output
     * @param executor Executor for result access
     * @param sessionId Session identifier
     * @param resultDesc Result descriptor for metadata
     */
    public CliTableauResultView(Terminal terminal, Executor executor, String sessionId, ResultDescriptor resultDesc);
    
    /**
     * Display results in tableau format
     * Outputs formatted table directly to terminal
     */
    public void displayResults();
    
    /**
     * Close view and cleanup resources
     */
    public void close();
}

Usage Example:

// Display results in tableau format
try (CliTableauResultView tableauView = new CliTableauResultView(
        terminal, executor, sessionId, resultDescriptor)) {
    tableauView.displayResults();
}

CliView Abstract Class

Base class for CLI view components providing common view functionality.

public abstract class CliView<OP, OUT> {
    /**
     * Get associated CLI client
     * @return CliClient instance
     */
    protected CliClient getClient();
    
    /**
     * Get terminal for output operations
     * @return Terminal instance
     */
    protected Terminal getTerminal();
    
    /**
     * Check if terminal is plain (limited features)
     * @return true if terminal has limited capabilities
     */
    protected boolean isPlainTerminal();
}

CliResultView Abstract Class

Base class for result display views providing common result handling.

public abstract class CliResultView<OP> extends CliView<OP, Void> {
    /**
     * Open result view for interaction
     */
    public abstract void open();
    
    /**
     * Get result descriptor
     * @return ResultDescriptor for this view
     */
    protected ResultDescriptor getResultDescriptor();
}

Row Processing and Data Types

Row Access Patterns

Working with Row objects in results:

// Process row data
void processRow(Row row) {
    // Get row kind for change tracking
    RowKind kind = row.getKind();
    
    // Access fields by position
    Object field0 = row.getField(0);
    Object field1 = row.getField(1);
    
    // Access fields with type safety
    String name = row.getFieldAs(0, String.class);
    Integer age = row.getFieldAs(1, Integer.class);
    
    // Iterate over all fields
    for (int i = 0; i < row.getArity(); i++) {
        Object field = row.getField(i);
        System.out.println("Field " + i + ": " + field);
    }
}

Schema Integration

Using ResolvedSchema for type-safe result processing:

void processResultWithSchema(ResultDescriptor descriptor, List<Row> rows) {
    ResolvedSchema schema = descriptor.getResultSchema();
    List<Column> columns = schema.getColumns();
    
    for (Row row : rows) {
        for (int i = 0; i < columns.size(); i++) {
            Column column = columns.get(i);
            Object value = row.getField(i);
            
            System.out.printf("Column %s (%s): %s%n", 
                column.getName(), 
                column.getDataType(), 
                value);
        }
    }
}

Error Handling

Result handling includes comprehensive error management:

try {
    TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, resultId);
    // Process result
} catch (SqlExecutionException e) {
    if (e.getMessage().contains("Result expired")) {
        System.err.println("Result has expired - restart query");
    } else if (e.getMessage().contains("Result not found")) {
        System.err.println("Invalid result ID");
    } else {
        System.err.println("Execution error: " + e.getMessage());
    }
}

// Handle pagination errors
try {
    List<Row> page = materializedResult.retrievePage(pageNumber);
} catch (IndexOutOfBoundsException e) {
    System.err.println("Invalid page number: " + pageNumber);
} catch (IllegalStateException e) {
    System.err.println("Result snapshot has expired");
}

Performance Considerations

Efficient Result Processing

// Batch process rows for better performance
List<Row> batch = new ArrayList<>();
TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, resultId);

if (result.getType() == ResultType.PAYLOAD) {
    batch.addAll(result.getPayload());
    
    // Process in batches of 1000 rows
    if (batch.size() >= 1000) {
        processBatch(batch);
        batch.clear();
    }
}

Memory Management

// Use try-with-resources for automatic cleanup
try (CliTableauResultView view = new CliTableauResultView(terminal, executor, sessionId, descriptor)) {
    view.displayResults();
} // Automatic resource cleanup

// Cancel long-running queries when needed
if (shouldCancel) {
    executor.cancelQuery(sessionId, resultId);
}

The result handling and display system provides a comprehensive framework for managing both batch and streaming query results with efficient memory usage, type safety, and flexible display options.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-client-2-12

docs

command-line-interface.md

configuration-options.md

index.md

result-handling-display.md

session-context-management.md

sql-client-application.md

sql-execution-gateway.md

tile.json