SQL Client for exploring and submitting SQL programs to Flink
—
Core execution interface providing session management, SQL parsing, and operation execution. The gateway abstracts the underlying Flink execution environment and provides a unified API for SQL operations across different deployment modes.
Primary interface for SQL execution operations with complete session lifecycle management.
public interface Executor {
/**
* Initialize executor and ensure readiness for command execution
* @throws SqlExecutionException if initialization fails
*/
void start() throws SqlExecutionException;
/**
* Open new session with optional session identifier
* @param sessionId Desired session ID or null for auto-generation
* @return Actual session ID used for tracking
* @throws SqlExecutionException if session creation fails
*/
String openSession(String sessionId) throws SqlExecutionException;
/**
* Close session and release associated resources
* @param sessionId Session identifier to close
* @throws SqlExecutionException if session closure fails
*/
void closeSession(String sessionId) throws SqlExecutionException;
}Session-level configuration management with property get/set operations.
/**
* Get session configuration as Map for external access
* @param sessionId Session identifier
* @return Copy of all session configuration properties
* @throws SqlExecutionException if session not found
*/
Map<String, String> getSessionConfigMap(String sessionId) throws SqlExecutionException;
/**
* Get session configuration as ReadableConfig for type-safe access
* @param sessionId Session identifier
* @return ReadableConfig instance with session properties
* @throws SqlExecutionException if session not found
*/
ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException;
/**
* Reset all session properties to default values
* @param sessionId Session identifier
* @throws SqlExecutionException if reset fails
*/
void resetSessionProperties(String sessionId) throws SqlExecutionException;
/**
* Reset specific session property to default value
* @param sessionId Session identifier
* @param key Property key to reset
* @throws SqlExecutionException if reset fails
*/
void resetSessionProperty(String sessionId, String key) throws SqlExecutionException;
/**
* Set session property to specific value
* @param sessionId Session identifier
* @param key Property key
* @param value Property value
* @throws SqlExecutionException if property setting fails
*/
void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException;Usage Example:
// Configure session properties
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "tableau");
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "4");
// Get current configuration
Map<String, String> config = executor.getSessionConfigMap(sessionId);
ReadableConfig readableConfig = executor.getSessionConfig(sessionId);
// Reset properties
executor.resetSessionProperty(sessionId, "table.exec.resource.default-parallelism");
executor.resetSessionProperties(sessionId); // Reset allParse and execute SQL statements with full operation lifecycle support.
/**
* Parse SQL statement into executable Operation
* @param sessionId Session identifier for context
* @param statement SQL statement to parse
* @return Operation instance ready for execution
* @throws SqlExecutionException if parsing fails
*/
Operation parseStatement(String sessionId, String statement) throws SqlExecutionException;
/**
* Provide auto-completion suggestions for SQL statement
* @param sessionId Session identifier for context
* @param statement Partial SQL statement
* @param position Cursor position within statement
* @return List of completion suggestions
*/
List<String> completeStatement(String sessionId, String statement, int position);
/**
* Execute parsed operation and return result
* @param sessionId Session identifier
* @param operation Parsed operation to execute
* @return TableResult with operation results
* @throws SqlExecutionException if execution fails
*/
TableResult executeOperation(String sessionId, Operation operation) throws SqlExecutionException;Usage Example:
// Parse and execute statement
String sql = "CREATE TABLE users (id INT, name STRING) WITH ('connector' = 'datagen')";
Operation operation = executor.parseStatement(sessionId, sql);
TableResult result = executor.executeOperation(sessionId, operation);
// Auto-completion
List<String> suggestions = executor.completeStatement(sessionId, "SELECT * FROM use", 17);
// Returns: ["users", "user_events", ...] based on available tablesExecute queries with advanced result handling for streaming and batch operations.
/**
* Execute SELECT query and return result descriptor for streaming access
* @param sessionId Session identifier
* @param query Parsed query operation
* @return ResultDescriptor for accessing query results
* @throws SqlExecutionException if query execution fails
*/
ResultDescriptor executeQuery(String sessionId, QueryOperation query) throws SqlExecutionException;
/**
* Retrieve incremental changes from streaming query result
* @param sessionId Session identifier
* @param resultId Result identifier from executeQuery
* @return TypedResult containing list of result rows or status
* @throws SqlExecutionException if retrieval fails
*/
TypedResult<List<Row>> retrieveResultChanges(String sessionId, String resultId) throws SqlExecutionException;
/**
* Create materialized snapshot of query result with pagination
* @param sessionId Session identifier
* @param resultId Result identifier from executeQuery
* @param pageSize Number of rows per page
* @return TypedResult containing total page count
* @throws SqlExecutionException if snapshot creation fails
*/
TypedResult<Integer> snapshotResult(String sessionId, String resultId, int pageSize) throws SqlExecutionException;
/**
* Retrieve specific page from materialized result snapshot
* @param resultId Result identifier
* @param page Page number (0-based)
* @return List of rows for the requested page
* @throws SqlExecutionException if page retrieval fails
*/
List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;
/**
* Cancel running query and stop result generation
* @param sessionId Session identifier
* @param resultId Result identifier to cancel
* @throws SqlExecutionException if cancellation fails
*/
void cancelQuery(String sessionId, String resultId) throws SqlExecutionException;Usage Example:
// Execute streaming query
QueryOperation queryOp = (QueryOperation) executor.parseStatement(sessionId, "SELECT * FROM events");
ResultDescriptor descriptor = executor.executeQuery(sessionId, queryOp);
if (descriptor.isMaterialized()) {
// Handle materialized results with pagination
TypedResult<Integer> pageCount = executor.snapshotResult(sessionId, descriptor.getResultId(), 100);
if (pageCount.getType() == TypedResult.ResultType.PAYLOAD) {
for (int page = 0; page < pageCount.getPayload(); page++) {
List<Row> rows = executor.retrieveResultPage(descriptor.getResultId(), page);
// Process rows
}
}
} else {
// Handle streaming results
while (true) {
TypedResult<List<Row>> changes = executor.retrieveResultChanges(sessionId, descriptor.getResultId());
if (changes.getType() == TypedResult.ResultType.PAYLOAD) {
List<Row> rows = changes.getPayload();
// Process incremental changes
} else if (changes.getType() == TypedResult.ResultType.EOS) {
break; // End of stream
}
// Handle EMPTY results by continuing to poll
}
}Execute multiple modification operations as a batch for better performance.
/**
* Execute multiple modification operations as batch
* @param sessionId Session identifier
* @param operations List of modification operations (INSERT, UPDATE, DELETE)
* @return TableResult with batch execution results
* @throws SqlExecutionException if batch execution fails
*/
TableResult executeModifyOperations(String sessionId, List<ModifyOperation> operations) throws SqlExecutionException;Usage Example:
// Batch multiple INSERT operations
List<ModifyOperation> operations = Arrays.asList(
(ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table1 VALUES (1, 'a')"),
(ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table2 VALUES (2, 'b')")
);
TableResult batchResult = executor.executeModifyOperations(sessionId, operations);
System.out.println("Batch job ID: " + batchResult.getJobClient().get().getJobID());Manage session-level JAR dependencies for custom functions and connectors.
/**
* Add JAR file to session classpath
* @param sessionId Session identifier
* @param jarPath Path to JAR file (local or remote URL)
*/
void addJar(String sessionId, String jarPath);
/**
* Remove JAR file from session classpath
* @param sessionId Session identifier
* @param jarPath Path to JAR file to remove
*/
void removeJar(String sessionId, String jarPath);
/**
* List all JAR files loaded in session
* @param sessionId Session identifier
* @return List of JAR file paths
*/
List<String> listJars(String sessionId);Usage Example:
// Add custom connector JAR
executor.addJar(sessionId, "/path/to/custom-connector-1.0.jar");
// Add JAR from URL
executor.addJar(sessionId, "https://repo.maven.apache.org/maven2/org/example/connector/1.0/connector-1.0.jar");
// List loaded JARs
List<String> jars = executor.listJars(sessionId);
jars.forEach(System.out::println);
// Remove JAR when no longer needed
executor.removeJar(sessionId, "/path/to/custom-connector-1.0.jar");Concrete implementation of Executor interface for embedded execution mode.
public class LocalExecutor implements Executor {
/**
* Create local executor with default context
* @param defaultContext Default execution context with configuration and dependencies
*/
public LocalExecutor(DefaultContext defaultContext);
}Usage Example:
// Create local executor
DefaultContext context = LocalContextUtils.buildDefaultContext(options);
Executor executor = new LocalExecutor(context);
// Start and use executor
executor.start();
String sessionId = executor.openSession(null);
try {
// Perform SQL operations
Operation op = executor.parseStatement(sessionId, "SHOW TABLES");
TableResult result = executor.executeOperation(sessionId, op);
} finally {
executor.closeSession(sessionId);
}Primary exception type for SQL execution errors.
public class SqlExecutionException extends Exception {
public SqlExecutionException(String message);
public SqlExecutionException(String message, Throwable cause);
}Common error scenarios:
Example Error Handling:
try {
Operation operation = executor.parseStatement(sessionId, malformedSQL);
} catch (SqlExecutionException e) {
System.err.println("SQL Error: " + e.getMessage());
// Handle parsing or execution error
if (e.getCause() instanceof ValidationException) {
// Handle validation-specific error
}
}Typical session usage pattern:
Executor executor = new LocalExecutor(defaultContext);
executor.start();
String sessionId = executor.openSession("my-session");
try {
// Configure session
executor.setSessionProperty(sessionId, "key", "value");
// Execute operations
Operation op = executor.parseStatement(sessionId, sql);
TableResult result = executor.executeOperation(sessionId, op);
} finally {
// Always close session
executor.closeSession(sessionId);
}Pattern for handling streaming query results:
QueryOperation query = (QueryOperation) executor.parseStatement(sessionId, streamingSQL);
ResultDescriptor descriptor = executor.executeQuery(sessionId, query);
// Poll for streaming results
while (!cancelled) {
TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, descriptor.getResultId());
switch (result.getType()) {
case PAYLOAD:
processRows(result.getPayload());
break;
case EMPTY:
Thread.sleep(100); // Wait before next poll
break;
case EOS:
System.out.println("Stream ended");
return;
}
}The Executor interface provides a comprehensive abstraction over Flink's SQL execution capabilities, enabling both simple statement execution and complex streaming data processing workflows.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-client-2-12