Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.
—
Remote SQL execution capabilities for building client-server architectures and multi-tenant SQL services with Apache Flink's Table API.
Core service interface for managing SQL sessions and executing SQL operations remotely.
/**
* Open a new session for SQL execution
* @param environment Session configuration and initialization parameters
* @return SessionHandle for identifying the session
* @throws SqlGatewayException if session creation fails
*/
public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
/**
* Close an existing session and clean up resources
* @param sessionHandle Handle of the session to close
* @throws SqlGatewayException if session closure fails
*/
public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
/**
* Configure session properties using SET statements
* @param sessionHandle Handle of the session to configure
* @param statement Configuration statement (e.g., "SET table.planner = blink")
* @param executionTimeoutMs Maximum execution time in milliseconds
* @throws SqlGatewayException if configuration fails
*/
public void configureSession(SessionHandle sessionHandle,
String statement,
long executionTimeoutMs) throws SqlGatewayException;
/**
* Execute a SQL statement (DDL, DML, or query)
* @param sessionHandle Handle of the session
* @param statement SQL statement to execute
* @param executionTimeoutMs Maximum execution time in milliseconds
* @param executionConfig Additional execution configuration
* @return OperationHandle for tracking the operation
* @throws SqlGatewayException if execution fails
*/
public OperationHandle executeStatement(SessionHandle sessionHandle,
String statement,
long executionTimeoutMs,
Configuration executionConfig) throws SqlGatewayException;
/**
* Submit a custom operation for execution
* @param sessionHandle Handle of the session
* @param executor Callable that produces the operation result
* @return OperationHandle for tracking the operation
* @throws SqlGatewayException if submission fails
*/
public OperationHandle submitOperation(SessionHandle sessionHandle,
Callable<ResultSet> executor) throws SqlGatewayException;
/**
* Cancel a running operation
* @param sessionHandle Handle of the session
* @param operationHandle Handle of the operation to cancel
* @throws SqlGatewayException if cancellation fails
*/
public void cancelOperation(SessionHandle sessionHandle,
OperationHandle operationHandle) throws SqlGatewayException;
/**
* Close an operation and clean up resources
* @param sessionHandle Handle of the session
* @param operationHandle Handle of the operation to close
* @throws SqlGatewayException if operation closure fails
*/
public void closeOperation(SessionHandle sessionHandle,
OperationHandle operationHandle) throws SqlGatewayException;
/**
* Get information about an operation
* @param sessionHandle Handle of the session
* @param operationHandle Handle of the operation
* @return OperationInfo containing status and metadata
* @throws SqlGatewayException if operation info retrieval fails
*/
public OperationInfo getOperationInfo(SessionHandle sessionHandle,
OperationHandle operationHandle) throws SqlGatewayException;
/**
* Get schema information for an operation result
* @param sessionHandle Handle of the session
* @param operationHandle Handle of the operation
* @return ResolvedSchema of the operation result
* @throws SqlGatewayException if schema retrieval fails
*/
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle,
OperationHandle operationHandle) throws SqlGatewayException;
/**
* Fetch results from an operation
* @param sessionHandle Handle of the session
* @param operationHandle Handle of the operation
* @param orientation Direction for fetching results
* @param maxRows Maximum number of rows to fetch
* @return ResultSet containing the fetched results
* @throws SqlGatewayException if result fetching fails
*/
public ResultSet fetchResults(SessionHandle sessionHandle,
OperationHandle operationHandle,
FetchOrientation orientation,
int maxRows) throws SqlGatewayException;Basic SQL Gateway Usage:
// Create SQL Gateway service instance
SqlGatewayService gateway = SqlGatewayServiceImpl.create(gatewayConfig);
// Open session with configuration
Map<String, String> sessionProperties = new HashMap<>();
sessionProperties.put("execution.parallelism", "4");
sessionProperties.put("table.planner", "blink");
SessionEnvironment sessionEnv = SessionEnvironment.newBuilder()
.setSessionEndpointConfig(endpointConfig)
.addSessionConfig(sessionProperties)
.build();
SessionHandle session = gateway.openSession(sessionEnv);
try {
// Configure session
gateway.configureSession(session, "SET execution.checkpointing.interval = 10s", 5000);
// Execute DDL
OperationHandle createTable = gateway.executeStatement(session,
"CREATE TABLE orders (" +
" id BIGINT," +
" customer_id BIGINT," +
" amount DECIMAL(10,2)" +
") WITH ('connector' = 'kafka', ...)",
30000, new Configuration());
// Execute query
OperationHandle query = gateway.executeStatement(session,
"SELECT customer_id, SUM(amount) as total " +
"FROM orders GROUP BY customer_id",
60000, new Configuration());
// Fetch results
ResultSet results = gateway.fetchResults(session, query,
FetchOrientation.FETCH_NEXT, 100);
while (results.hasNext()) {
RowData row = results.next();
System.out.println("Customer: " + row.getLong(0) + ", Total: " + row.getDecimal(1, 10, 2));
}
} finally {
gateway.closeSession(session);
}Handle SQL Gateway sessions with proper lifecycle management.
/**
* Session handle for identifying and tracking SQL sessions
*/
public final class SessionHandle {
/**
* Get unique session identifier
* @return UUID representing the session
*/
public UUID getIdentifier();
/**
* Get session creation timestamp
* @return Session creation time
*/
public Instant getCreationTime();
}
/**
* Session environment configuration
*/
public final class SessionEnvironment {
/**
* Create new session environment builder
* @return Builder for constructing session environment
*/
public static Builder newBuilder();
public static final class Builder {
/**
* Set session endpoint configuration
* @param config Endpoint-specific configuration
* @return Builder for method chaining
*/
public Builder setSessionEndpointConfig(Map<String, String> config);
/**
* Add session configuration properties
* @param config Map of configuration key-value pairs
* @return Builder for method chaining
*/
public Builder addSessionConfig(Map<String, String> config);
/**
* Set default catalog name
* @param catalogName Name of the default catalog
* @return Builder for method chaining
*/
public Builder setDefaultCatalog(String catalogName);
/**
* Set default database name
* @param databaseName Name of the default database
* @return Builder for method chaining
*/
public Builder setDefaultDatabase(String databaseName);
/**
* Build the session environment
* @return Constructed SessionEnvironment
*/
public SessionEnvironment build();
}
}Track and manage long-running SQL operations.
/**
* Operation handle for identifying and tracking SQL operations
*/
public final class OperationHandle {
/**
* Get unique operation identifier
* @return UUID representing the operation
*/
public UUID getIdentifier();
/**
* Get operation type
* @return Type of the operation (EXECUTE_STATEMENT, etc.)
*/
public OperationType getOperationType();
}
/**
* Operation information and status
*/
public final class OperationInfo {
/**
* Get operation status
* @return Current status of the operation
*/
public OperationStatus getStatus();
/**
* Get operation exception if failed
* @return Optional exception that caused operation failure
*/
public Optional<Throwable> getException();
/**
* Check if operation has results available
* @return true if operation has results to fetch
*/
public boolean hasResults();
/**
* Get operation creation timestamp
* @return When the operation was created
*/
public Instant getCreateTime();
/**
* Get operation end timestamp
* @return When the operation completed (if finished)
*/
public Optional<Instant> getEndTime();
}
/**
* Operation status enumeration
*/
public enum OperationStatus {
INITIALIZED,
PENDING,
RUNNING,
FINISHED,
CANCELED,
FAILED,
TIMEOUT
}
/**
* Operation type enumeration
*/
public enum OperationType {
EXECUTE_STATEMENT,
SUBMIT_PLAN
}Handle result sets and data fetching from SQL operations.
/**
* Result set for accessing query results
*/
public interface ResultSet extends AutoCloseable {
/**
* Get the result type
* @return Type of results (PAYLOAD, NOT_READY, EOS)
*/
public ResultType getResultType();
/**
* Get next token for pagination
* @return Optional token for fetching next page of results
*/
public Optional<String> getNextToken();
/**
* Get result data as list of RowData
* @return List containing the fetched rows
*/
public List<RowData> getData();
/**
* Check if more results are available
* @return true if hasNext() would return more data
*/
public boolean hasNext();
/**
* Get next row of data
* @return Next RowData instance
*/
public RowData next();
/**
* Get result schema
* @return Schema of the result rows
*/
public ResolvedSchema getResultSchema();
}
/**
* Result type enumeration
*/
public enum ResultType {
PAYLOAD, // Contains actual data
NOT_READY, // Operation not ready yet
EOS // End of stream
}
/**
* Fetch orientation for result pagination
*/
public enum FetchOrientation {
FETCH_NEXT, // Fetch next rows
FETCH_PRIOR, // Fetch previous rows
FETCH_FIRST, // Fetch from beginning
FETCH_LAST // Fetch from end
}Result Handling Examples:
// Execute query and handle results
OperationHandle queryOp = gateway.executeStatement(session,
"SELECT * FROM large_table ORDER BY id",
120000, new Configuration());
// Wait for operation to complete
OperationInfo opInfo;
do {
Thread.sleep(1000);
opInfo = gateway.getOperationInfo(session, queryOp);
} while (opInfo.getStatus() == OperationStatus.RUNNING);
if (opInfo.getStatus() == OperationStatus.FINISHED) {
// Fetch results in batches
String nextToken = null;
int batchSize = 1000;
do {
ResultSet batch = gateway.fetchResults(session, queryOp,
FetchOrientation.FETCH_NEXT, batchSize);
if (batch.getResultType() == ResultType.PAYLOAD) {
List<RowData> rows = batch.getData();
for (RowData row : rows) {
processRow(row);
}
nextToken = batch.getNextToken().orElse(null);
}
} while (nextToken != null);
}Configure and manage SQL Gateway endpoints for different protocols.
/**
* SQL Gateway endpoint interface
*/
public interface SqlGatewayEndpoint {
/**
* Start the endpoint and begin accepting connections
* @throws SqlGatewayException if startup fails
*/
public void start() throws SqlGatewayException;
/**
* Stop the endpoint and close all connections
* @throws SqlGatewayException if shutdown fails
*/
public void stop() throws SqlGatewayException;
/**
* Get endpoint information
* @return Information about this endpoint
*/
public EndpointInfo getInfo();
}
/**
* Endpoint information
*/
public final class EndpointInfo {
/**
* Get endpoint identifier
* @return Unique endpoint ID
*/
public String getEndpointId();
/**
* Get endpoint version
* @return Version string of the endpoint
*/
public String getEndpointVersion();
/**
* Get supported protocols
* @return Set of supported protocol names
*/
public Set<String> getSupportedProtocols();
}Configure SQL Gateway service and endpoints.
/**
* SQL Gateway configuration builder
*/
public class SqlGatewayConfig {
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
/**
* Set session timeout
* @param timeout Session timeout duration
* @return Builder for method chaining
*/
public Builder setSessionTimeout(Duration timeout);
/**
* Set maximum number of concurrent sessions
* @param maxSessions Maximum session count
* @return Builder for method chaining
*/
public Builder setMaxSessions(int maxSessions);
/**
* Set operation timeout
* @param timeout Default operation timeout
* @return Builder for method chaining
*/
public Builder setOperationTimeout(Duration timeout);
/**
* Set result fetch timeout
* @param timeout Result fetch timeout
* @return Builder for method chaining
*/
public Builder setResultFetchTimeout(Duration timeout);
/**
* Enable/disable session persistence
* @param persistent Whether to persist sessions
* @return Builder for method chaining
*/
public Builder setPersistentSessions(boolean persistent);
/**
* Build the configuration
* @return Constructed configuration
*/
public SqlGatewayConfig build();
}
}Handle SQL Gateway specific exceptions and error conditions.
/**
* Base exception for SQL Gateway operations
*/
public class SqlGatewayException extends Exception {
/**
* Get error code
* @return Specific error code for the exception
*/
public String getErrorCode();
/**
* Get error details
* @return Additional error details and context
*/
public Map<String, String> getErrorDetails();
}
/**
* Exception for session-related errors
*/
public class SessionException extends SqlGatewayException {
/**
* Get session handle that caused the error
* @return Session handle associated with error
*/
public SessionHandle getSessionHandle();
}
/**
* Exception for operation-related errors
*/
public class OperationException extends SqlGatewayException {
/**
* Get operation handle that caused the error
* @return Operation handle associated with error
*/
public OperationHandle getOperationHandle();
}Common patterns for building applications with SQL Gateway.
// Connection pooling for multiple clients
public class SqlGatewayConnectionPool {
private final SqlGatewayService gateway;
private final Queue<SessionHandle> availableSessions;
private final int maxSessions;
public SessionHandle acquireSession() throws SqlGatewayException {
SessionHandle session = availableSessions.poll();
if (session == null) {
if (activeSessions.size() < maxSessions) {
session = gateway.openSession(defaultSessionEnv);
} else {
throw new SqlGatewayException("No available sessions");
}
}
return session;
}
public void releaseSession(SessionHandle session) {
availableSessions.offer(session);
}
}
// Async query execution
public class AsyncQueryExecutor {
public CompletableFuture<List<RowData>> executeQueryAsync(
SessionHandle session, String query) {
return CompletableFuture.supplyAsync(() -> {
try {
OperationHandle op = gateway.executeStatement(session, query, 60000, new Configuration());
// Poll for completion
OperationInfo info;
do {
Thread.sleep(100);
info = gateway.getOperationInfo(session, op);
} while (info.getStatus() == OperationStatus.RUNNING);
if (info.getStatus() == OperationStatus.FINISHED) {
ResultSet results = gateway.fetchResults(session, op,
FetchOrientation.FETCH_NEXT, Integer.MAX_VALUE);
return results.getData();
} else {
throw new RuntimeException("Query failed: " + info.getException());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
// Multi-tenant session management
public class MultiTenantGateway {
private final Map<String, SessionHandle> tenantSessions = new ConcurrentHashMap<>();
public SessionHandle getOrCreateTenantSession(String tenantId) throws SqlGatewayException {
return tenantSessions.computeIfAbsent(tenantId, id -> {
try {
SessionEnvironment env = SessionEnvironment.newBuilder()
.setDefaultCatalog("tenant_" + id)
.addSessionConfig(Map.of(
"execution.parallelism", "2",
"table.exec.resource.default-parallelism", "2"
))
.build();
return gateway.openSession(env);
} catch (SqlGatewayException e) {
throw new RuntimeException(e);
}
});
}
public void executeForTenant(String tenantId, String sql) throws SqlGatewayException {
SessionHandle session = getOrCreateTenantSession(tenantId);
gateway.executeStatement(session, sql, 30000, new Configuration());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber