A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
—
The SqlGatewayService is the main service interface responsible for handling requests from endpoints. It provides comprehensive functionality for session management, SQL execution, operation management, and catalog operations.
Create and manage isolated sessions for multiple clients with environment configuration and resource cleanup.
/**
* Open a new session with the specified environment configuration
* @param environment Environment to initialize the Session
* @return Handle that identifies the Session
* @throws SqlGatewayException if session creation fails
*/
SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
/**
* Close an existing session and clean up resources
* @param sessionHandle Handle to identify the Session to close
* @throws SqlGatewayException if session closure fails
*/
void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
/**
* Configure session using SQL statements (SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR)
* @param sessionHandle Handle to identify the session
* @param statement SQL statement used to configure the session
* @param executionTimeoutMs Execution timeout in milliseconds (non-positive disables timeout)
* @throws SqlGatewayException if configuration fails
*/
void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
/**
* Get current configuration of the session
* @param sessionHandle Handle to identify the session
* @return Map of configuration key-value pairs
* @throws SqlGatewayException if session not found or access fails
*/
Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
/**
* Get endpoint version negotiated during session opening
* @param sessionHandle Handle to identify the session
* @return Negotiated endpoint version
* @throws SqlGatewayException if session not found
*/
EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;Execute SQL statements asynchronously with timeout support and execution configuration.
/**
* Execute the submitted SQL statement
* @param sessionHandle Handle to identify the session
* @param statement SQL statement to execute
* @param executionTimeoutMs Execution timeout in milliseconds (non-positive disables timeout)
* @param executionConfig Configuration for statement execution
* @return Handle to identify the operation
* @throws SqlGatewayException if execution fails
*/
OperationHandle executeStatement(
SessionHandle sessionHandle,
String statement,
long executionTimeoutMs,
Configuration executionConfig
) throws SqlGatewayException;
/**
* Fetch results from operation using token-based pagination
* @param sessionHandle Handle to identify the session
* @param operationHandle Handle to identify the operation
* @param token Token to identify results position
* @param maxRows Maximum number of rows to fetch (Integer.MAX_VALUE for all available)
* @return ResultSet containing data and metadata
* @throws SqlGatewayException if fetch fails
*/
ResultSet fetchResults(
SessionHandle sessionHandle,
OperationHandle operationHandle,
long token,
int maxRows
) throws SqlGatewayException;
/**
* Fetch results from operation using orientation-based navigation
* @param sessionHandle Handle to identify the session
* @param operationHandle Handle to identify the operation
* @param orientation Direction to fetch results (FETCH_NEXT or FETCH_PRIOR)
* @param maxRows Maximum number of rows to fetch
* @return ResultSet with at least one row if not end-of-stream
* @throws SqlGatewayException if fetch fails
*/
ResultSet fetchResults(
SessionHandle sessionHandle,
OperationHandle operationHandle,
FetchOrientation orientation,
int maxRows
) throws SqlGatewayException;Submit, monitor, and control operations with comprehensive lifecycle management.
/**
* Submit an operation for execution
* @param sessionHandle Handle to identify the session
* @param executor Main logic to get execution results
* @return Handle for retrieving results later
* @throws SqlGatewayException if submission fails
*/
OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException;
/**
* Cancel operation when not in terminal status
* @param sessionHandle Handle to identify the session
* @param operationHandle Handle to identify the operation
* @throws SqlGatewayException if cancellation fails or operation already terminated
*/
void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
/**
* Close operation and release all resources
* @param sessionHandle Handle to identify the session
* @param operationHandle Handle to identify the operation
* @throws SqlGatewayException if closure fails
*/
void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
/**
* Get operation information including status and errors
* @param sessionHandle Handle to identify the session
* @param operationHandle Handle to identify the operation
* @return OperationInfo with status and exception details
* @throws SqlGatewayException if operation not found
*/
OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
/**
* Get result schema for the operation (available when FINISHED)
* @param sessionHandle Handle to identify the session
* @param operationHandle Handle to identify the operation
* @return Resolved schema of the operation results
* @throws SqlGatewayException if operation not found or not finished
*/
ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;Access and explore Flink catalog metadata including catalogs, databases, tables, and functions.
/**
* Get current catalog name for the session
* @param sessionHandle Handle to identify the session
* @return Name of the current catalog
* @throws SqlGatewayException if session not found
*/
String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;
/**
* List all available catalogs in the session
* @param sessionHandle Handle to identify the session
* @return Set of registered catalog names
* @throws SqlGatewayException if session not found
*/
Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
/**
* List all available databases in the given catalog
* @param sessionHandle Handle to identify the session
* @param catalogName Name of the catalog
* @return Set of database names in the catalog
* @throws SqlGatewayException if session or catalog not found
*/
Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException;
/**
* List tables/views in the given catalog and database
* @param sessionHandle Handle to identify the session
* @param catalogName Name of the catalog
* @param databaseName Name of the database
* @param tableKinds Types of tables to return (TABLE, VIEW, etc.)
* @return Set of TableInfo for matching tables/views
* @throws SqlGatewayException if session, catalog, or database not found
*/
Set<TableInfo> listTables(
SessionHandle sessionHandle,
String catalogName,
String databaseName,
Set<TableKind> tableKinds
) throws SqlGatewayException;
/**
* Get table information for fully qualified table name
* @param sessionHandle Handle to identify the session
* @param tableIdentifier Fully qualified table identifier
* @return ResolvedCatalogBaseTable with complete table information
* @throws SqlGatewayException if session or table not found
*/
ResolvedCatalogBaseTable<?> getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException;
/**
* List user-defined functions in catalog and database
* @param sessionHandle Handle to identify the session
* @param catalogName Name of the catalog
* @param databaseName Name of the database
* @return Set of FunctionInfo for user-defined functions
* @throws SqlGatewayException if session, catalog, or database not found
*/
Set<FunctionInfo> listUserDefinedFunctions(SessionHandle sessionHandle, String catalogName, String databaseName) throws SqlGatewayException;
/**
* List all available system functions
* @param sessionHandle Handle to identify the session
* @return Set of FunctionInfo for system functions
* @throws SqlGatewayException if session not found
*/
Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) throws SqlGatewayException;
/**
* Get specific function definition with resolution order: temporary system, system, temporary, catalog
* @param sessionHandle Handle to identify the session
* @param functionIdentifier Identifier of the function
* @return FunctionDefinition with complete function details
* @throws SqlGatewayException if session or function not found
*/
FunctionDefinition getFunctionDefinition(SessionHandle sessionHandle, UnresolvedIdentifier functionIdentifier) throws SqlGatewayException;Manage materialized table refresh operations with periodic and one-time scheduling support.
/**
* Trigger refresh operation for specific materialized table
* @param sessionHandle Handle to identify the session
* @param materializedTableIdentifier Fully qualified table identifier (catalogName.databaseName.objectName)
* @param isPeriodic Whether workflow is periodic or one-time-only
* @param scheduleTime Time point for scheduler trigger (nullable)
* @param dynamicOptions Dynamic configuration options
* @param staticPartitions Specific partitions for one-time refresh
* @param executionConfig Flink job configuration
* @return Handle to identify the refresh operation
*/
OperationHandle refreshMaterializedTable(
SessionHandle sessionHandle,
String materializedTableIdentifier,
boolean isPeriodic,
@Nullable String scheduleTime,
Map<String, String> dynamicOptions,
Map<String, String> staticPartitions,
Map<String, String> executionConfig
);Deploy SQL scripts in application mode for batch job execution.
/**
* Deploy script in application mode
* @param sessionHandle Handle to identify the session
* @param scriptUri URI of the script (nullable)
* @param script Content of the script (nullable)
* @param executionConfig Configuration to run the script
* @return Cluster identifier for the deployed script
* @throws SqlGatewayException if deployment fails
*/
<ClusterID> ClusterID deployScript(
SessionHandle sessionHandle,
@Nullable URI scriptUri,
@Nullable String script,
Configuration executionConfig
) throws SqlGatewayException;Provide SQL statement completion hints for interactive SQL clients.
/**
* Get completion hints for SQL statement at given position
* @param sessionHandle Handle to identify the session
* @param statement SQL statement to be completed
* @param position Cursor position where completion is needed
* @return List of completion suggestions
* @throws SqlGatewayException if session not found or completion fails
*/
List<String> completeStatement(SessionHandle sessionHandle, String statement, int position) throws SqlGatewayException;Get gateway information and service metadata.
/**
* Get information about the SqlGatewayService
* @return GatewayInfo with product name and version
*/
GatewayInfo getGatewayInfo();import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
// Create service instance
SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);
// Open session
SessionEnvironment environment = SessionEnvironment.newBuilder()
.setSessionName("my-session")
.addSessionConfig(Map.of("execution.target", "remote"))
.build();
SessionHandle session = service.openSession(environment);
// Execute SQL
OperationHandle operation = service.executeStatement(
session,
"SELECT * FROM my_table",
30000L, // 30 second timeout
new Configuration()
);
// Check operation status
OperationInfo info = service.getOperationInfo(session, operation);
if (info.getStatus().isTerminalStatus()) {
// Fetch results
ResultSet results = service.fetchResults(session, operation, 0L, 100);
// Process results...
}
// Clean up
service.closeOperation(session, operation);
service.closeSession(session);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway