A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-gateway@2.1.0Apache Flink SQL Gateway is a service component that provides a multi-client SQL execution interface for remote connections. It acts as a gateway service enabling concurrent SQL execution from multiple clients through REST and HiveServer2 endpoints, with session management, operation tracking, and comprehensive catalog integration.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-gateway</artifactId>
<version>2.1.0</version>
</dependency>// Main Gateway Classes
import org.apache.flink.table.gateway.SqlGateway;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
// Session Management
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.context.DefaultContext;
// Operation Management
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.OperationInfo;
// Results and Data
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
// Endpoints
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
// Configuration
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;
// Workflow Management
import org.apache.flink.table.workflow.WorkflowScheduler;
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.SqlGateway;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import java.util.Collections;
// Start SQL Gateway programmatically
Configuration config = new Configuration();
DefaultContext defaultContext = DefaultContext.load(config, Collections.emptyList(), true);
SessionManager sessionManager = SessionManager.create(defaultContext);
SqlGateway gateway = new SqlGateway(defaultContext.getFlinkConfig(), sessionManager);
gateway.start();
// Use the service directly for SQL operations
SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);
// Open a session
SessionEnvironment environment = SessionEnvironment.newBuilder()
.setSessionEndpointVersion(EndpointVersion.V1)
.build();
SessionHandle session = service.openSession(environment);
// Execute SQL
OperationHandle operation = service.executeStatement(
session,
"SELECT 1",
30000L, // 30 second timeout
new Configuration()
);
// Clean up
service.closeOperation(session, operation);
service.closeSession(session);
// Stop gateway when done
gateway.stop();The Flink SQL Gateway is built around several key components:
SqlGatewayService provides the main API for session, operation, and catalog managementMain service interface providing session management, SQL execution, and catalog operations. The heart of the SQL Gateway system.
public interface SqlGatewayService {
// Session Management
SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
// Statement Execution
OperationHandle executeStatement(SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) throws SqlGatewayException;
ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) throws SqlGatewayException;
// Operation Management
OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException;
void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
}Session lifecycle management with environment configuration, handle creation, and session isolation for multi-client scenarios.
public class SessionHandle {
public static SessionHandle create();
public UUID getIdentifier();
}
public class SessionEnvironment {
public static Builder newBuilder();
public Optional<String> getSessionName();
public Map<String, String> getSessionConfig();
public EndpointVersion getSessionEndpointVersion();
}Asynchronous operation execution with comprehensive status tracking, cancellation support, and resource management.
public class OperationHandle {
public static OperationHandle create();
public UUID getIdentifier();
}
public enum OperationStatus {
INITIALIZED, PENDING, RUNNING, FINISHED, CANCELED, CLOSED, ERROR, TIMEOUT;
public boolean isTerminalStatus();
public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);
}
public class OperationInfo {
public OperationStatus getStatus();
public Optional<String> getException();
}Rich result types with metadata, pagination support, and schema information for handling query results and operation outcomes.
public interface ResultSet {
ResultType getResultType();
Long getNextToken();
ResolvedSchema getResultSchema();
List<RowData> getData();
boolean isQueryResult();
Optional<JobID> getJobID();
}
public class TableInfo {
public ObjectIdentifier getIdentifier();
public TableKind getTableKind();
}
public class FunctionInfo {
public UnresolvedIdentifier getIdentifier();
public Optional<FunctionKind> getKind();
}Pluggable endpoint architecture enabling REST, HiveServer2, and custom endpoint implementations with SPI-based discovery.
public interface SqlGatewayEndpoint {
void start() throws Exception;
void stop() throws Exception;
}
public interface SqlGatewayEndpointFactory {
SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);
interface Context {
SqlGatewayService getSqlGatewayService();
Configuration getFlinkConfiguration();
ConfigOption<?>[] getEndpointOptions();
}
}Complete REST API implementation with comprehensive endpoints for session, statement, operation, and catalog management.
public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {
// REST endpoints for:
// - Session: POST /sessions, DELETE /sessions/{sessionId}
// - Statement: POST /sessions/{sessionId}/statements
// - Operation: GET /sessions/{sessionId}/operations/{operationId}/status
// - Results: GET /sessions/{sessionId}/operations/{operationId}/result/{token}
// - Catalog: Integrated through statement execution
}
public enum RowFormat {
JSON, PLAIN_TEXT
}Comprehensive configuration system for service behavior, session management, worker threads, and REST endpoint settings.
public class SqlGatewayServiceConfigOptions {
public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM;
public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX;
public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;
}
public class SqlGatewayRestOptions {
public static final ConfigOption<String> ADDRESS;
public static final ConfigOption<String> BIND_ADDRESS;
public static final ConfigOption<Integer> PORT;
}Materialized table scheduling system with Quartz integration for periodic refresh operations and workflow lifecycle management.
public interface WorkflowScheduler<T extends RefreshHandler> {
void open() throws WorkflowException;
void close() throws WorkflowException;
RefreshHandlerSerializer<T> getRefreshHandlerSerializer();
T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;
void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;
void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;
}
public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {
public EmbeddedWorkflowScheduler(Configuration configuration);
}
public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {
public static final String IDENTIFIER = "embedded";
public WorkflowScheduler<?> createWorkflowScheduler(Context context);
}// Session and Operation Identifiers
public class SessionHandle {
private final UUID identifier;
public SessionHandle(UUID identifier);
public UUID getIdentifier();
}
public class OperationHandle {
private final UUID identifier;
public OperationHandle(UUID identifier);
public UUID getIdentifier();
}
// Exception Types
public class SqlGatewayException extends Exception {
public SqlGatewayException(String message);
public SqlGatewayException(String message, Throwable cause);
}
// Result Enums
public enum FetchOrientation {
FETCH_NEXT, FETCH_PRIOR
}
public enum ResultType {
NOT_READY, PAYLOAD, EOS
}