or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration-options.mdcore-service-interface.mdendpoint-framework.mdindex.mdoperation-management.mdrest-implementation.mdresult-data-models.mdsession-management.mdworkflow-management.md
tile.json

tessl/maven-org-apache-flink--flink-sql-gateway

A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-gateway@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-gateway@2.1.0

index.mddocs/

Apache Flink SQL Gateway

Apache Flink SQL Gateway is a service component that provides a multi-client SQL execution interface for remote connections. It acts as a gateway service enabling concurrent SQL execution from multiple clients through REST and HiveServer2 endpoints, with session management, operation tracking, and comprehensive catalog integration.

Package Information

  • Package Name: flink-sql-gateway
  • Package Type: Maven
  • Group ID: org.apache.flink
  • Artifact ID: flink-sql-gateway
  • Language: Java
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-gateway</artifactId>
    <version>2.1.0</version>
</dependency>

Core Imports

// Main Gateway Classes
import org.apache.flink.table.gateway.SqlGateway;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;

// Session Management
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.context.DefaultContext;

// Operation Management
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.OperationInfo;

// Results and Data
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.FetchOrientation;

// Endpoints
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;

// Configuration
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;

// Workflow Management
import org.apache.flink.table.workflow.WorkflowScheduler;
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;

Basic Usage

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.SqlGateway;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;

import java.util.Collections;

// Start SQL Gateway programmatically
Configuration config = new Configuration();
DefaultContext defaultContext = DefaultContext.load(config, Collections.emptyList(), true);
SessionManager sessionManager = SessionManager.create(defaultContext);

SqlGateway gateway = new SqlGateway(defaultContext.getFlinkConfig(), sessionManager);
gateway.start();

// Use the service directly for SQL operations
SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);

// Open a session
SessionEnvironment environment = SessionEnvironment.newBuilder()
    .setSessionEndpointVersion(EndpointVersion.V1)
    .build();

SessionHandle session = service.openSession(environment);

// Execute SQL
OperationHandle operation = service.executeStatement(
    session,
    "SELECT 1",
    30000L, // 30 second timeout
    new Configuration()
);

// Clean up
service.closeOperation(session, operation);
service.closeSession(session);

// Stop gateway when done
gateway.stop();

Architecture

The Flink SQL Gateway is built around several key components:

  • Core Service Interface: SqlGatewayService provides the main API for session, operation, and catalog management
  • Endpoint Framework: Pluggable endpoint architecture with REST and HiveServer2 implementations
  • Session Management: Multi-client session handling with isolation and configuration
  • Operation Management: Asynchronous SQL operation execution with status tracking and result pagination
  • REST Implementation: Complete REST API with comprehensive endpoints for all operations
  • Workflow Management: Materialized table scheduling integration with Quartz scheduler
  • Result Management: Efficient result storage and fetching with token-based pagination

Capabilities

Core Service Interface

Main service interface providing session management, SQL execution, and catalog operations. The heart of the SQL Gateway system.

public interface SqlGatewayService {
    // Session Management
    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
    void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
    void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
    Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
    
    // Statement Execution
    OperationHandle executeStatement(SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) throws SqlGatewayException;
    ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) throws SqlGatewayException;
    
    // Operation Management
    OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException;
    void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
}

Core Service Interface

Session Management

Session lifecycle management with environment configuration, handle creation, and session isolation for multi-client scenarios.

public class SessionHandle {
    public static SessionHandle create();
    public UUID getIdentifier();
}

public class SessionEnvironment {
    public static Builder newBuilder();
    public Optional<String> getSessionName();
    public Map<String, String> getSessionConfig();
    public EndpointVersion getSessionEndpointVersion();
}

Session Management

Operation Management

Asynchronous operation execution with comprehensive status tracking, cancellation support, and resource management.

public class OperationHandle {
    public static OperationHandle create();
    public UUID getIdentifier();
}

public enum OperationStatus {
    INITIALIZED, PENDING, RUNNING, FINISHED, CANCELED, CLOSED, ERROR, TIMEOUT;
    public boolean isTerminalStatus();
    public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);
}

public class OperationInfo {
    public OperationStatus getStatus();
    public Optional<String> getException();
}

Operation Management

Result Data Models

Rich result types with metadata, pagination support, and schema information for handling query results and operation outcomes.

public interface ResultSet {
    ResultType getResultType();
    Long getNextToken();
    ResolvedSchema getResultSchema();
    List<RowData> getData();
    boolean isQueryResult();
    Optional<JobID> getJobID();
}

public class TableInfo {
    public ObjectIdentifier getIdentifier();
    public TableKind getTableKind();
}

public class FunctionInfo {
    public UnresolvedIdentifier getIdentifier();
    public Optional<FunctionKind> getKind();
}

Result Data Models

Endpoint Framework

Pluggable endpoint architecture enabling REST, HiveServer2, and custom endpoint implementations with SPI-based discovery.

public interface SqlGatewayEndpoint {
    void start() throws Exception;
    void stop() throws Exception;
}

public interface SqlGatewayEndpointFactory {
    SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);
    
    interface Context {
        SqlGatewayService getSqlGatewayService();
        Configuration getFlinkConfiguration();
        ConfigOption<?>[] getEndpointOptions();
    }
}

Endpoint Framework

REST Implementation

Complete REST API implementation with comprehensive endpoints for session, statement, operation, and catalog management.

public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {
    // REST endpoints for:
    // - Session: POST /sessions, DELETE /sessions/{sessionId}
    // - Statement: POST /sessions/{sessionId}/statements
    // - Operation: GET /sessions/{sessionId}/operations/{operationId}/status
    // - Results: GET /sessions/{sessionId}/operations/{operationId}/result/{token}
    // - Catalog: Integrated through statement execution
}

public enum RowFormat {
    JSON, PLAIN_TEXT
}

REST Implementation

Configuration Options

Comprehensive configuration system for service behavior, session management, worker threads, and REST endpoint settings.

public class SqlGatewayServiceConfigOptions {
    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
    public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM;
    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX;
    public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;
}

public class SqlGatewayRestOptions {
    public static final ConfigOption<String> ADDRESS;
    public static final ConfigOption<String> BIND_ADDRESS;
    public static final ConfigOption<Integer> PORT;
}

Configuration Options

Workflow Management

Materialized table scheduling system with Quartz integration for periodic refresh operations and workflow lifecycle management.

public interface WorkflowScheduler<T extends RefreshHandler> {
    void open() throws WorkflowException;
    void close() throws WorkflowException;
    RefreshHandlerSerializer<T> getRefreshHandlerSerializer();
    T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;
    void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;
    void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;
}

public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {
    public EmbeddedWorkflowScheduler(Configuration configuration);
}

public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {
    public static final String IDENTIFIER = "embedded";
    public WorkflowScheduler<?> createWorkflowScheduler(Context context);
}

Workflow Management

Types

Core Types

// Session and Operation Identifiers
public class SessionHandle {
    private final UUID identifier;
    public SessionHandle(UUID identifier);
    public UUID getIdentifier();
}

public class OperationHandle {
    private final UUID identifier;
    public OperationHandle(UUID identifier);
    public UUID getIdentifier();
}

// Exception Types
public class SqlGatewayException extends Exception {
    public SqlGatewayException(String message);
    public SqlGatewayException(String message, Throwable cause);
}

// Result Enums
public enum FetchOrientation {
    FETCH_NEXT, FETCH_PRIOR
}

public enum ResultType {
    NOT_READY, PAYLOAD, EOS
}