A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
—
The endpoint framework provides a pluggable architecture for creating different types of gateway endpoints (REST, HiveServer2, custom) with SPI-based discovery and configuration management.
Base interface for all gateway endpoints with lifecycle management.
/**
* Base interface for gateway endpoints
*/
public interface SqlGatewayEndpoint {
/**
* Start the endpoint and begin accepting connections
* @throws Exception if startup fails
*/
void start() throws Exception;
/**
* Stop the endpoint and clean up resources
* @throws Exception if shutdown fails
*/
void stop() throws Exception;
}Factory interface for creating endpoint instances using SPI discovery.
/**
* Factory for creating SqlGatewayEndpoint instances
*/
public interface SqlGatewayEndpointFactory {
/**
* Create endpoint instance with context
* @param context Factory context with service and configuration
* @return SqlGatewayEndpoint instance
*/
SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);
/**
* Factory context providing service and configuration access
*/
interface Context {
/**
* Get SQL Gateway service instance
* @return SqlGatewayService for endpoint use
*/
SqlGatewayService getSqlGatewayService();
/**
* Get Flink configuration
* @return Configuration instance
*/
Configuration getFlinkConfiguration();
/**
* Get endpoint-specific configuration options
* @return Array of ConfigOption for this endpoint type
*/
ConfigOption<?>[] getEndpointOptions();
}
}Utility class for discovering and creating endpoints from configuration.
/**
* Utilities for endpoint discovery and creation
*/
public class SqlGatewayEndpointFactoryUtils {
/**
* Create endpoints from configuration using SPI discovery
* @param service SqlGatewayService instance
* @param configuration Flink configuration
* @return List of created endpoints
* @throws Exception if endpoint creation fails
*/
public static List<SqlGatewayEndpoint> createSqlGatewayEndpoint(
SqlGatewayService service,
Configuration configuration
) throws Exception;
/**
* Create endpoint factory helper for validation
* @param factoryClass Factory class to validate
* @param configuration Configuration for validation
* @return FactoryHelper instance
*/
public static FactoryHelper createEndpointFactoryHelper(
Class<? extends SqlGatewayEndpointFactory> factoryClass,
Configuration configuration
);
}import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
// Custom endpoint implementation
public class CustomSqlGatewayEndpoint implements SqlGatewayEndpoint {
private final SqlGatewayService service;
private final Configuration config;
private volatile boolean running = false;
public CustomSqlGatewayEndpoint(SqlGatewayService service, Configuration config) {
this.service = service;
this.config = config;
}
@Override
public void start() throws Exception {
// Start custom endpoint (e.g., gRPC server, custom protocol)
System.out.println("Starting custom endpoint...");
running = true;
// Custom startup logic here
}
@Override
public void stop() throws Exception {
System.out.println("Stopping custom endpoint...");
running = false;
// Custom shutdown logic here
}
public boolean isRunning() {
return running;
}
}
// Custom endpoint factory
public class CustomSqlGatewayEndpointFactory implements SqlGatewayEndpointFactory {
@Override
public SqlGatewayEndpoint createSqlGatewayEndpoint(Context context) {
return new CustomSqlGatewayEndpoint(
context.getSqlGatewayService(),
context.getFlinkConfiguration()
);
}
}import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
// Create endpoints from configuration
Configuration config = new Configuration();
config.setString("sql-gateway.endpoint.type", "rest");
config.setString("sql-gateway.endpoint.rest.port", "8083");
SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);
// Create endpoints using factory utils
List<SqlGatewayEndpoint> endpoints = SqlGatewayEndpointFactoryUtils
.createSqlGatewayEndpoint(service, config);
// Start all endpoints
for (SqlGatewayEndpoint endpoint : endpoints) {
endpoint.start();
}
// Endpoints are now running and accepting connections
// Stop all endpoints when done
for (SqlGatewayEndpoint endpoint : endpoints) {
endpoint.stop();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway