A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
—
Session management provides isolation and configuration for multiple concurrent clients connecting to the SQL Gateway. Each session maintains its own catalog context, configuration, and operation history.
Unique identifier for sessions using UUID-based handles.
/**
* Session Handle that identifies a unique session
*/
public class SessionHandle {
private final UUID identifier;
/**
* Create a new session handle with random UUID
* @return New SessionHandle instance
*/
public static SessionHandle create();
/**
* Create session handle with specific UUID
* @param identifier UUID to use for the session
*/
public SessionHandle(UUID identifier);
/**
* Get the UUID identifier for this session
* @return UUID identifier
*/
public UUID getIdentifier();
@Override
public boolean equals(Object o);
@Override
public int hashCode();
@Override
public String toString();
}Environment configuration for initializing sessions with catalogs, modules, and settings.
/**
* Environment configuration for session initialization
*/
public class SessionEnvironment {
/**
* Get optional session name
* @return Optional session name
*/
public Optional<String> getSessionName();
/**
* Get endpoint version negotiated for this session
* @return EndpointVersion for the session
*/
public EndpointVersion getSessionEndpointVersion();
/**
* Get session configuration map
* @return Map of configuration key-value pairs
*/
public Map<String, String> getSessionConfig();
/**
* Get registered catalog creators
* @return Map of catalog creators by name
*/
public Map<String, CatalogCreator> getRegisteredCatalogCreators();
/**
* Get registered module creators in order
* @return List of module creators
*/
public List<ModuleCreator> getRegisteredModuleCreators();
/**
* Get default catalog name if specified
* @return Optional default catalog name
*/
public Optional<String> getDefaultCatalog();
/**
* Create new builder for SessionEnvironment
* @return Builder instance
*/
public static Builder newBuilder();
/**
* Builder for constructing SessionEnvironment instances
*/
public static class Builder {
/**
* Set optional session name
* @param sessionName Name for the session
* @return Builder instance for chaining
*/
public Builder setSessionName(String sessionName);
/**
* Set endpoint version for negotiation
* @param endpointVersion Version to use
* @return Builder instance for chaining
*/
public Builder setSessionEndpointVersion(EndpointVersion endpointVersion);
/**
* Add configuration properties to the session
* @param config Map of configuration properties
* @return Builder instance for chaining
*/
public Builder addSessionConfig(Map<String, String> config);
/**
* Add single configuration property
* @param key Configuration key
* @param value Configuration value
* @return Builder instance for chaining
*/
public Builder addSessionConfig(String key, String value);
/**
* Register catalog instance with name
* @param name Catalog name
* @param catalog Catalog instance
* @return Builder instance for chaining
*/
public Builder registerCatalog(String name, Catalog catalog);
/**
* Register catalog creator for lazy initialization
* @param name Catalog name
* @param creator CatalogCreator instance
* @return Builder instance for chaining
*/
public Builder registerCatalogCreator(String name, CatalogCreator creator);
/**
* Register module at head of module list
* @param name Module name
* @param module Module instance
* @return Builder instance for chaining
*/
public Builder registerModuleAtHead(String name, Module module);
/**
* Register module creator at head for lazy initialization
* @param name Module name
* @param creator ModuleCreator instance
* @return Builder instance for chaining
*/
public Builder registerModuleCreatorAtHead(String name, ModuleCreator creator);
/**
* Set default catalog name
* @param catalogName Default catalog name
* @return Builder instance for chaining
*/
public Builder setDefaultCatalog(String catalogName);
/**
* Build the SessionEnvironment instance
* @return Configured SessionEnvironment
*/
public SessionEnvironment build();
}
/**
* Interface for creating catalogs lazily
*/
public interface CatalogCreator {
/**
* Create catalog instance
* @return Catalog instance
*/
Catalog create();
}
/**
* Interface for creating modules lazily
*/
public interface ModuleCreator {
/**
* Create module instance
* @return Module instance
*/
Module create();
}
}Marker interface for endpoint version negotiation during session establishment.
/**
* Marker interface for endpoint versions
*/
public interface EndpointVersion {
// Marker interface - implementations define specific version behavior
}Interface for managing session lifecycle and coordination.
/**
* Manages session lifecycle and coordination
*/
public interface SessionManager {
/**
* Start the session manager
*/
void start();
/**
* Stop the session manager and clean up all sessions
*/
void stop();
/**
* Create session manager with default context
* @param defaultContext Default context for sessions
* @return SessionManager instance
*/
static SessionManager create(DefaultContext defaultContext);
}Represents an individual session instance with context and state.
/**
* Represents a single session instance
*/
public class Session {
/**
* Get session handle
* @return SessionHandle for this session
*/
public SessionHandle getSessionHandle();
/**
* Get session context
* @return SessionContext for this session
*/
public SessionContext getSessionContext();
/**
* Check if session is alive
* @return true if session is active
*/
public boolean isAlive();
/**
* Touch session to update last access time
*/
public void touch();
/**
* Close session and clean up resources
*/
public void close();
}import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
// Create session environment
SessionEnvironment environment = SessionEnvironment.newBuilder()
.setSessionName("analytics-session")
.addSessionConfig("execution.target", "remote")
.addSessionConfig("parallelism.default", "4")
.build();
// Open session through service
SqlGatewayService service = // ... get service instance
SessionHandle session = service.openSession(environment);
// Use session for operations
// ...
// Close when done
service.closeSession(session);import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
// Create session with custom catalogs and modules
SessionEnvironment environment = SessionEnvironment.newBuilder()
.setSessionName("data-processing")
.addSessionConfig(Map.of(
"execution.target", "yarn-per-job",
"execution.savepoint.path", "hdfs://cluster/savepoints",
"table.exec.resource.default-parallelism", "8"
))
.registerCatalogCreator("hive", () -> new HiveCatalog(
"hive",
"default",
"path/to/hive-conf"
))
.registerModuleCreatorAtHead("hive", () -> new HiveModule("2.3.6"))
.setDefaultCatalog("hive")
.build();
SessionHandle session = service.openSession(environment);
// Session now has Hive catalog and module available// Get current session configuration
Map<String, String> config = service.getSessionConfig(session);
System.out.println("Current parallelism: " + config.get("parallelism.default"));
// Configure session with SQL statements
service.configureSession(session, "SET 'parallelism.default' = '16'", 10000L);
service.configureSession(session, "CREATE CATALOG my_catalog WITH (...)", 30000L);
service.configureSession(session, "USE CATALOG my_catalog", 5000L);
// Configuration is now updated for this session// Create multiple isolated sessions
SessionHandle session1 = service.openSession(
SessionEnvironment.newBuilder()
.setSessionName("batch-processing")
.addSessionConfig("execution.runtime-mode", "BATCH")
.build()
);
SessionHandle session2 = service.openSession(
SessionEnvironment.newBuilder()
.setSessionName("stream-processing")
.addSessionConfig("execution.runtime-mode", "STREAMING")
.addSessionConfig("execution.checkpointing.interval", "60s")
.build()
);
// Each session has independent configuration and catalog context
// Operations in session1 don't affect session2Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway