Java client interfaces for exploring and querying CDAP datasets using Apache Hive SQL with asynchronous execution and metadata operations.
—
Synchronous and asynchronous SQL query execution with comprehensive result handling, status monitoring, and resource management. Supports both blocking operations via the Explore interface and non-blocking operations via ExploreClient.
Direct SQL execution with immediate result handling via the Explore service interface.
/**
* Core explore service interface for synchronous operations
*/
interface Explore {
/**
* Execute SQL statement synchronously
* @param namespace namespace context for the query
* @param statement SQL statement to execute
* @return query handle for result retrieval
* @throws ExploreException if query execution fails
*/
QueryHandle execute(NamespaceId namespace, String statement) throws ExploreException, SQLException;
/**
* Execute SQL with additional session configuration
* @param namespace namespace context
* @param statement SQL statement
* @param additionalSessionConf additional Hive session configuration
* @return query handle for result retrieval
* @throws ExploreException if query execution fails
* @throws SQLException if SQL error occurs
*/
QueryHandle execute(NamespaceId namespace, String statement,
Map<String, String> additionalSessionConf) throws ExploreException, SQLException;
/**
* Get current status of a query
* @param handle query handle
* @return current query status
* @throws HandleNotFoundException if handle is not found
* @throws SQLException if SQL error occurs
*/
QueryStatus getStatus(QueryHandle handle) throws HandleNotFoundException, SQLException;
/**
* Get result schema for a query
* @param handle query handle
* @return list of column descriptions
* @throws HandleNotFoundException if handle is not found
*/
List<ColumnDesc> getResultSchema(QueryHandle handle) throws HandleNotFoundException, SQLException;
/**
* Fetch next batch of results
* @param handle query handle
* @param size maximum number of results to fetch
* @return list of query results
* @throws HandleNotFoundException if handle is not found
* @throws SQLException if SQL error occurs
*/
List<QueryResult> nextResults(QueryHandle handle, int size) throws HandleNotFoundException, SQLException;
/**
* Preview results without advancing cursor
* @param handle query handle
* @return preview of query results
* @throws HandleNotFoundException if handle is not found
* @throws SQLException if SQL error occurs
*/
List<QueryResult> previewResults(QueryHandle handle) throws HandleNotFoundException, SQLException;
/**
* Close query handle and release resources
* @param handle query handle to close
* @throws HandleNotFoundException if handle is not found
* @throws SQLException if SQL error occurs
*/
void close(QueryHandle handle) throws HandleNotFoundException, SQLException;
/**
* Get information about active queries in namespace
* @param namespace namespace to query
* @return list of query information
* @throws ExploreException if operation fails
* @throws SQLException if SQL error occurs
*/
List<QueryInfo> getQueries(NamespaceId namespace) throws ExploreException, SQLException;
/**
* Get count of active queries in namespace
* @param namespace namespace to check
* @return number of active queries
* @throws ExploreException if operation fails
* @throws SQLException if SQL error occurs
*/
int getActiveQueryCount(NamespaceId namespace) throws ExploreException, SQLException;
}Non-blocking query execution via ExploreClient with ListenableFuture results.
/**
* Asynchronous query execution methods
*/
interface ExploreClient {
/**
* Submit SQL statement for asynchronous execution
* @param namespace namespace context for the query
* @param statement SQL statement to execute
* @return future containing execution results
*/
ListenableFuture<ExploreExecutionResult> submit(NamespaceId namespace, String statement);
}Comprehensive result processing with streaming support and schema access.
/**
* Results of an explore statement execution
*/
interface ExploreExecutionResult extends Iterator<QueryResult>, Closeable {
/**
* Get current fetch size hint
* @return current fetch size
*/
int getFetchSize();
/**
* Set fetch size hint for result retrieval
* @param fetchSize hint for number of rows to fetch per batch
*/
void setFetchSize(int fetchSize);
/**
* Get schema information for result set
* @return list of column descriptions
* @throws ExploreException if schema cannot be retrieved
*/
List<ColumnDesc> getResultSchema() throws ExploreException;
/**
* Check if query can contain result data
* @return true if results may be available
*/
boolean canContainResults();
/**
* Get current execution status
* @return query execution status
*/
QueryStatus getStatus();
}
/**
* Query execution status information
*/
class QueryStatus {
public enum OpStatus {
INITIALIZED, RUNNING, FINISHED, CANCELED, CLOSED, ERROR, UNKNOWN, PENDING
}
/**
* Get current operation status
* @return status enum value
*/
public OpStatus getStatus();
/**
* Check if query has result set available
* @return true if results are available
*/
public boolean hasResultSet();
/**
* Get error message if query failed
* @return error message or null if no error
*/
public String getErrorMessage();
/**
* Get SQL state if query failed
* @return SQL state or null if no error
*/
public String getSqlState();
/**
* Get operation handle
* @return query handle
*/
public QueryHandle getOperationHandle();
}
/**
* Individual query result row
*/
class QueryResult {
/**
* Get column values for this result row
* @return list of column values
*/
public List<Object> getColumns();
}
/**
* Information about an active query
*/
class QueryInfo {
/**
* Get query handle
* @return query handle
*/
public QueryHandle getQueryHandle();
/**
* Get SQL statement
* @return SQL statement text
*/
public String getStatement();
/**
* Get query timestamp
* @return query start timestamp
*/
public long getTimestamp();
/**
* Check if query is active
* @return true if query is active
*/
public boolean isActive();
}Usage Examples:
import co.cask.cdap.explore.service.Explore;
import co.cask.cdap.explore.client.ExploreClient;
import co.cask.cdap.explore.client.ExploreExecutionResult;
import co.cask.cdap.proto.QueryResult;
import co.cask.cdap.proto.QueryStatus;
import co.cask.cdap.proto.id.NamespaceId;
// Synchronous execution via Explore interface
Explore explore = // obtained via dependency injection
NamespaceId namespace = new NamespaceId("default");
try {
// Execute query and get handle
QueryHandle handle = explore.execute(namespace, "SELECT * FROM my_table LIMIT 100");
// Check query status
QueryStatus status = explore.getStatus(handle);
while (status.getStatus() == QueryStatus.OpStatus.RUNNING) {
Thread.sleep(1000);
status = explore.getStatus(handle);
}
if (status.hasResultSet()) {
// Get result schema
List<ColumnDesc> schema = explore.getResultSchema(handle);
System.out.println("Columns: " + schema.size());
// Fetch results in batches
List<QueryResult> results;
do {
results = explore.nextResults(handle, 50);
for (QueryResult row : results) {
System.out.println("Row: " + row.getColumns());
}
} while (!results.isEmpty());
}
// Always close handle
explore.close(handle);
} catch (ExploreException e) {
System.err.println("Query execution failed: " + e.getMessage());
}
// Asynchronous execution via ExploreClient
ExploreClient client = // obtained via dependency injection
try {
// Submit query asynchronously
ListenableFuture<ExploreExecutionResult> future =
client.submit(namespace, "SELECT COUNT(*) FROM large_dataset");
// Process results when ready
ExploreExecutionResult result = future.get(30, TimeUnit.SECONDS);
// Check if results are available
if (result.canContainResults()) {
// Set fetch size for performance
result.setFetchSize(1000);
// Stream through results
while (result.hasNext()) {
QueryResult row = result.next();
// Process each row
System.out.println("Result: " + row.getColumns());
}
}
// Always close resources
result.close();
} catch (Exception e) {
System.err.println("Async query failed: " + e.getMessage());
}
// Query monitoring
try {
List<QueryInfo> activeQueries = explore.getQueries(namespace);
System.out.println("Active queries: " + activeQueries.size());
int queryCount = explore.getActiveQueryCount(namespace);
System.out.println("Total active: " + queryCount);
} catch (ExploreException e) {
System.err.println("Query monitoring failed: " + e.getMessage());
}Comprehensive error handling for query execution scenarios.
/**
* General explore operation exception
*/
class ExploreException extends Exception {
public ExploreException(String message);
public ExploreException(String message, Throwable cause);
public ExploreException(Throwable cause);
}
/**
* Exception thrown when query handle is not found
*/
class HandleNotFoundException extends Exception {
/**
* Check if handle became inactive
* @return true if handle is inactive
*/
public boolean isInactive();
}Common Error Scenarios:
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-explore-client