CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-explore-client

Java client interfaces for exploring and querying CDAP datasets using Apache Hive SQL with asynchronous execution and metadata operations.

Pending
Overview
Eval results
Files

query-execution.mddocs/

Query Execution

Synchronous and asynchronous SQL query execution with comprehensive result handling, status monitoring, and resource management. Supports both blocking operations via the Explore interface and non-blocking operations via ExploreClient.

Capabilities

Synchronous Query Execution

Direct SQL execution with immediate result handling via the Explore service interface.

/**
 * Core explore service interface for synchronous operations
 */
interface Explore {
    /**
     * Execute SQL statement synchronously
     * @param namespace namespace context for the query
     * @param statement SQL statement to execute
     * @return query handle for result retrieval
     * @throws ExploreException if query execution fails
     */
    QueryHandle execute(NamespaceId namespace, String statement) throws ExploreException, SQLException;
    
    /**
     * Execute SQL with additional session configuration
     * @param namespace namespace context
     * @param statement SQL statement
     * @param additionalSessionConf additional Hive session configuration
     * @return query handle for result retrieval
     * @throws ExploreException if query execution fails
     * @throws SQLException if SQL error occurs
     */
    QueryHandle execute(NamespaceId namespace, String statement, 
                       Map<String, String> additionalSessionConf) throws ExploreException, SQLException;
    
    /**
     * Get current status of a query
     * @param handle query handle
     * @return current query status
     * @throws HandleNotFoundException if handle is not found
     * @throws SQLException if SQL error occurs
     */
    QueryStatus getStatus(QueryHandle handle) throws HandleNotFoundException, SQLException;
    
    /**
     * Get result schema for a query
     * @param handle query handle
     * @return list of column descriptions
     * @throws HandleNotFoundException if handle is not found
     */
    List<ColumnDesc> getResultSchema(QueryHandle handle) throws HandleNotFoundException, SQLException;
    
    /**
     * Fetch next batch of results
     * @param handle query handle
     * @param size maximum number of results to fetch
     * @return list of query results
     * @throws HandleNotFoundException if handle is not found
     * @throws SQLException if SQL error occurs
     */
    List<QueryResult> nextResults(QueryHandle handle, int size) throws HandleNotFoundException, SQLException;
    
    /**
     * Preview results without advancing cursor
     * @param handle query handle
     * @return preview of query results
     * @throws HandleNotFoundException if handle is not found
     * @throws SQLException if SQL error occurs
     */
    List<QueryResult> previewResults(QueryHandle handle) throws HandleNotFoundException, SQLException;
    
    /**
     * Close query handle and release resources
     * @param handle query handle to close
     * @throws HandleNotFoundException if handle is not found
     * @throws SQLException if SQL error occurs
     */
    void close(QueryHandle handle) throws HandleNotFoundException, SQLException;
    
    /**
     * Get information about active queries in namespace
     * @param namespace namespace to query
     * @return list of query information
     * @throws ExploreException if operation fails
     * @throws SQLException if SQL error occurs
     */
    List<QueryInfo> getQueries(NamespaceId namespace) throws ExploreException, SQLException;
    
    /**
     * Get count of active queries in namespace
     * @param namespace namespace to check
     * @return number of active queries
     * @throws ExploreException if operation fails
     * @throws SQLException if SQL error occurs
     */
    int getActiveQueryCount(NamespaceId namespace) throws ExploreException, SQLException;
}

Asynchronous Query Execution

Non-blocking query execution via ExploreClient with ListenableFuture results.

/**
 * Asynchronous query execution methods
 */
interface ExploreClient {
    /**
     * Submit SQL statement for asynchronous execution
     * @param namespace namespace context for the query
     * @param statement SQL statement to execute
     * @return future containing execution results
     */
    ListenableFuture<ExploreExecutionResult> submit(NamespaceId namespace, String statement);
}

Result Handling

Comprehensive result processing with streaming support and schema access.

/**
 * Results of an explore statement execution
 */
interface ExploreExecutionResult extends Iterator<QueryResult>, Closeable {
    /**
     * Get current fetch size hint
     * @return current fetch size
     */
    int getFetchSize();
    
    /**
     * Set fetch size hint for result retrieval
     * @param fetchSize hint for number of rows to fetch per batch
     */
    void setFetchSize(int fetchSize);
    
    /**
     * Get schema information for result set
     * @return list of column descriptions
     * @throws ExploreException if schema cannot be retrieved
     */
    List<ColumnDesc> getResultSchema() throws ExploreException;
    
    /**
     * Check if query can contain result data
     * @return true if results may be available
     */
    boolean canContainResults();
    
    /**
     * Get current execution status
     * @return query execution status
     */
    QueryStatus getStatus();
}

/**
 * Query execution status information
 */
class QueryStatus {
    public enum OpStatus { 
        INITIALIZED, RUNNING, FINISHED, CANCELED, CLOSED, ERROR, UNKNOWN, PENDING 
    }
    
    /**
     * Get current operation status
     * @return status enum value
     */
    public OpStatus getStatus();
    
    /**
     * Check if query has result set available
     * @return true if results are available
     */
    public boolean hasResultSet();
    
    /**
     * Get error message if query failed
     * @return error message or null if no error
     */
    public String getErrorMessage();
    
    /**
     * Get SQL state if query failed
     * @return SQL state or null if no error
     */
    public String getSqlState();
    
    /**
     * Get operation handle
     * @return query handle
     */
    public QueryHandle getOperationHandle();
}

/**
 * Individual query result row
 */
class QueryResult {
    /**
     * Get column values for this result row
     * @return list of column values
     */
    public List<Object> getColumns();
}

/**
 * Information about an active query
 */
class QueryInfo {
    /**
     * Get query handle
     * @return query handle
     */
    public QueryHandle getQueryHandle();
    
    /**
     * Get SQL statement
     * @return SQL statement text
     */
    public String getStatement();
    
    /**
     * Get query timestamp
     * @return query start timestamp
     */
    public long getTimestamp();
    
    /**
     * Check if query is active
     * @return true if query is active
     */
    public boolean isActive();
}

Usage Examples:

import co.cask.cdap.explore.service.Explore;
import co.cask.cdap.explore.client.ExploreClient;
import co.cask.cdap.explore.client.ExploreExecutionResult;
import co.cask.cdap.proto.QueryResult;
import co.cask.cdap.proto.QueryStatus;
import co.cask.cdap.proto.id.NamespaceId;

// Synchronous execution via Explore interface
Explore explore = // obtained via dependency injection
NamespaceId namespace = new NamespaceId("default");

try {
    // Execute query and get handle
    QueryHandle handle = explore.execute(namespace, "SELECT * FROM my_table LIMIT 100");
    
    // Check query status
    QueryStatus status = explore.getStatus(handle);
    while (status.getStatus() == QueryStatus.OpStatus.RUNNING) {
        Thread.sleep(1000);
        status = explore.getStatus(handle);
    }
    
    if (status.hasResultSet()) {
        // Get result schema
        List<ColumnDesc> schema = explore.getResultSchema(handle);
        System.out.println("Columns: " + schema.size());
        
        // Fetch results in batches
        List<QueryResult> results;
        do {
            results = explore.nextResults(handle, 50);
            for (QueryResult row : results) {
                System.out.println("Row: " + row.getColumns());
            }
        } while (!results.isEmpty());
    }
    
    // Always close handle
    explore.close(handle);
    
} catch (ExploreException e) {
    System.err.println("Query execution failed: " + e.getMessage());
}

// Asynchronous execution via ExploreClient
ExploreClient client = // obtained via dependency injection

try {
    // Submit query asynchronously
    ListenableFuture<ExploreExecutionResult> future = 
        client.submit(namespace, "SELECT COUNT(*) FROM large_dataset");
    
    // Process results when ready
    ExploreExecutionResult result = future.get(30, TimeUnit.SECONDS);
    
    // Check if results are available
    if (result.canContainResults()) {
        // Set fetch size for performance
        result.setFetchSize(1000);
        
        // Stream through results
        while (result.hasNext()) {
            QueryResult row = result.next();
            // Process each row
            System.out.println("Result: " + row.getColumns());
        }
    }
    
    // Always close resources
    result.close();
    
} catch (Exception e) {
    System.err.println("Async query failed: " + e.getMessage());
}

// Query monitoring
try {
    List<QueryInfo> activeQueries = explore.getQueries(namespace);
    System.out.println("Active queries: " + activeQueries.size());
    
    int queryCount = explore.getActiveQueryCount(namespace);
    System.out.println("Total active: " + queryCount);
    
} catch (ExploreException e) {
    System.err.println("Query monitoring failed: " + e.getMessage());
}

Error Handling

Comprehensive error handling for query execution scenarios.

/**
 * General explore operation exception
 */
class ExploreException extends Exception {
    public ExploreException(String message);
    public ExploreException(String message, Throwable cause);
    public ExploreException(Throwable cause);
}

/**
 * Exception thrown when query handle is not found
 */
class HandleNotFoundException extends Exception {
    /**
     * Check if handle became inactive
     * @return true if handle is inactive
     */
    public boolean isInactive();
}

Common Error Scenarios:

  • Service Unavailable: Explore service is not running or unreachable
  • Handle Not Found: Query handle expired or was never created
  • SQL Errors: Malformed SQL or table/column not found
  • Timeout Errors: Query execution exceeds configured timeouts
  • Authentication Errors: Insufficient permissions for query execution

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-explore-client

docs

client-operations.md

dataset-management.md

index.md

metadata-operations.md

query-execution.md

tile.json