or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-options.mdcore-service-interface.mdendpoint-framework.mdindex.mdoperation-management.mdrest-implementation.mdresult-data-models.mdsession-management.mdworkflow-management.md

index.mddocs/

0

# Apache Flink SQL Gateway

1

2

Apache Flink SQL Gateway is a service component that provides a multi-client SQL execution interface for remote connections. It acts as a gateway service enabling concurrent SQL execution from multiple clients through REST and HiveServer2 endpoints, with session management, operation tracking, and comprehensive catalog integration.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-gateway

7

- **Package Type**: Maven

8

- **Group ID**: org.apache.flink

9

- **Artifact ID**: flink-sql-gateway

10

- **Language**: Java

11

- **Installation**: Add to Maven dependencies:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-sql-gateway</artifactId>

17

<version>2.1.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

// Main Gateway Classes

25

import org.apache.flink.table.gateway.SqlGateway;

26

import org.apache.flink.table.gateway.api.SqlGatewayService;

27

import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;

28

29

// Session Management

30

import org.apache.flink.table.gateway.api.session.SessionHandle;

31

import org.apache.flink.table.gateway.api.session.SessionEnvironment;

32

import org.apache.flink.table.gateway.service.session.SessionManager;

33

import org.apache.flink.table.gateway.service.context.DefaultContext;

34

35

// Operation Management

36

import org.apache.flink.table.gateway.api.operation.OperationHandle;

37

import org.apache.flink.table.gateway.api.operation.OperationStatus;

38

import org.apache.flink.table.gateway.api.results.OperationInfo;

39

40

// Results and Data

41

import org.apache.flink.table.gateway.api.results.ResultSet;

42

import org.apache.flink.table.gateway.api.results.TableInfo;

43

import org.apache.flink.table.gateway.api.results.FunctionInfo;

44

import org.apache.flink.table.gateway.api.results.FetchOrientation;

45

46

// Endpoints

47

import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;

48

import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;

49

import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;

50

51

// Configuration

52

import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;

53

import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;

54

55

// Workflow Management

56

import org.apache.flink.table.workflow.WorkflowScheduler;

57

import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;

58

import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;

59

```

60

61

## Basic Usage

62

63

```java

64

import org.apache.flink.configuration.Configuration;

65

import org.apache.flink.table.gateway.SqlGateway;

66

import org.apache.flink.table.gateway.service.session.SessionManager;

67

import org.apache.flink.table.gateway.service.context.DefaultContext;

68

import org.apache.flink.table.gateway.api.SqlGatewayService;

69

import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;

70

import org.apache.flink.table.gateway.api.session.SessionEnvironment;

71

import org.apache.flink.table.gateway.api.session.SessionHandle;

72

import org.apache.flink.table.gateway.api.operation.OperationHandle;

73

import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;

74

75

import java.util.Collections;

76

77

// Start SQL Gateway programmatically

78

Configuration config = new Configuration();

79

DefaultContext defaultContext = DefaultContext.load(config, Collections.emptyList(), true);

80

SessionManager sessionManager = SessionManager.create(defaultContext);

81

82

SqlGateway gateway = new SqlGateway(defaultContext.getFlinkConfig(), sessionManager);

83

gateway.start();

84

85

// Use the service directly for SQL operations

86

SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);

87

88

// Open a session

89

SessionEnvironment environment = SessionEnvironment.newBuilder()

90

.setSessionEndpointVersion(EndpointVersion.V1)

91

.build();

92

93

SessionHandle session = service.openSession(environment);

94

95

// Execute SQL

96

OperationHandle operation = service.executeStatement(

97

session,

98

"SELECT 1",

99

30000L, // 30 second timeout

100

new Configuration()

101

);

102

103

// Clean up

104

service.closeOperation(session, operation);

105

service.closeSession(session);

106

107

// Stop gateway when done

108

gateway.stop();

109

```

110

111

## Architecture

112

113

The Flink SQL Gateway is built around several key components:

114

115

- **Core Service Interface**: `SqlGatewayService` provides the main API for session, operation, and catalog management

116

- **Endpoint Framework**: Pluggable endpoint architecture with REST and HiveServer2 implementations

117

- **Session Management**: Multi-client session handling with isolation and configuration

118

- **Operation Management**: Asynchronous SQL operation execution with status tracking and result pagination

119

- **REST Implementation**: Complete REST API with comprehensive endpoints for all operations

120

- **Workflow Management**: Materialized table scheduling integration with Quartz scheduler

121

- **Result Management**: Efficient result storage and fetching with token-based pagination

122

123

## Capabilities

124

125

### Core Service Interface

126

127

Main service interface providing session management, SQL execution, and catalog operations. The heart of the SQL Gateway system.

128

129

```java { .api }

130

public interface SqlGatewayService {

131

// Session Management

132

SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;

133

void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;

134

void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;

135

Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;

136

137

// Statement Execution

138

OperationHandle executeStatement(SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) throws SqlGatewayException;

139

ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) throws SqlGatewayException;

140

141

// Operation Management

142

OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException;

143

void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;

144

OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;

145

}

146

```

147

148

[Core Service Interface](./core-service-interface.md)

149

150

### Session Management

151

152

Session lifecycle management with environment configuration, handle creation, and session isolation for multi-client scenarios.

153

154

```java { .api }

155

public class SessionHandle {

156

public static SessionHandle create();

157

public UUID getIdentifier();

158

}

159

160

public class SessionEnvironment {

161

public static Builder newBuilder();

162

public Optional<String> getSessionName();

163

public Map<String, String> getSessionConfig();

164

public EndpointVersion getSessionEndpointVersion();

165

}

166

```

167

168

[Session Management](./session-management.md)

169

170

### Operation Management

171

172

Asynchronous operation execution with comprehensive status tracking, cancellation support, and resource management.

173

174

```java { .api }

175

public class OperationHandle {

176

public static OperationHandle create();

177

public UUID getIdentifier();

178

}

179

180

public enum OperationStatus {

181

INITIALIZED, PENDING, RUNNING, FINISHED, CANCELED, CLOSED, ERROR, TIMEOUT;

182

public boolean isTerminalStatus();

183

public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);

184

}

185

186

public class OperationInfo {

187

public OperationStatus getStatus();

188

public Optional<String> getException();

189

}

190

```

191

192

[Operation Management](./operation-management.md)

193

194

### Result Data Models

195

196

Rich result types with metadata, pagination support, and schema information for handling query results and operation outcomes.

197

198

```java { .api }

199

public interface ResultSet {

200

ResultType getResultType();

201

Long getNextToken();

202

ResolvedSchema getResultSchema();

203

List<RowData> getData();

204

boolean isQueryResult();

205

Optional<JobID> getJobID();

206

}

207

208

public class TableInfo {

209

public ObjectIdentifier getIdentifier();

210

public TableKind getTableKind();

211

}

212

213

public class FunctionInfo {

214

public UnresolvedIdentifier getIdentifier();

215

public Optional<FunctionKind> getKind();

216

}

217

```

218

219

[Result Data Models](./result-data-models.md)

220

221

### Endpoint Framework

222

223

Pluggable endpoint architecture enabling REST, HiveServer2, and custom endpoint implementations with SPI-based discovery.

224

225

```java { .api }

226

public interface SqlGatewayEndpoint {

227

void start() throws Exception;

228

void stop() throws Exception;

229

}

230

231

public interface SqlGatewayEndpointFactory {

232

SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);

233

234

interface Context {

235

SqlGatewayService getSqlGatewayService();

236

Configuration getFlinkConfiguration();

237

ConfigOption<?>[] getEndpointOptions();

238

}

239

}

240

```

241

242

[Endpoint Framework](./endpoint-framework.md)

243

244

### REST Implementation

245

246

Complete REST API implementation with comprehensive endpoints for session, statement, operation, and catalog management.

247

248

```java { .api }

249

public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {

250

// REST endpoints for:

251

// - Session: POST /sessions, DELETE /sessions/{sessionId}

252

// - Statement: POST /sessions/{sessionId}/statements

253

// - Operation: GET /sessions/{sessionId}/operations/{operationId}/status

254

// - Results: GET /sessions/{sessionId}/operations/{operationId}/result/{token}

255

// - Catalog: Integrated through statement execution

256

}

257

258

public enum RowFormat {

259

JSON, PLAIN_TEXT

260

}

261

```

262

263

[REST Implementation](./rest-implementation.md)

264

265

### Configuration Options

266

267

Comprehensive configuration system for service behavior, session management, worker threads, and REST endpoint settings.

268

269

```java { .api }

270

public class SqlGatewayServiceConfigOptions {

271

public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT;

272

public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM;

273

public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX;

274

public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;

275

}

276

277

public class SqlGatewayRestOptions {

278

public static final ConfigOption<String> ADDRESS;

279

public static final ConfigOption<String> BIND_ADDRESS;

280

public static final ConfigOption<Integer> PORT;

281

}

282

```

283

284

[Configuration Options](./configuration-options.md)

285

286

### Workflow Management

287

288

Materialized table scheduling system with Quartz integration for periodic refresh operations and workflow lifecycle management.

289

290

```java { .api }

291

public interface WorkflowScheduler<T extends RefreshHandler> {

292

void open() throws WorkflowException;

293

void close() throws WorkflowException;

294

RefreshHandlerSerializer<T> getRefreshHandlerSerializer();

295

T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;

296

void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;

297

void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;

298

}

299

300

public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {

301

public EmbeddedWorkflowScheduler(Configuration configuration);

302

}

303

304

public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {

305

public static final String IDENTIFIER = "embedded";

306

public WorkflowScheduler<?> createWorkflowScheduler(Context context);

307

}

308

```

309

310

[Workflow Management](./workflow-management.md)

311

312

## Types

313

314

### Core Types

315

316

```java { .api }

317

// Session and Operation Identifiers

318

public class SessionHandle {

319

private final UUID identifier;

320

public SessionHandle(UUID identifier);

321

public UUID getIdentifier();

322

}

323

324

public class OperationHandle {

325

private final UUID identifier;

326

public OperationHandle(UUID identifier);

327

public UUID getIdentifier();

328

}

329

330

// Exception Types

331

public class SqlGatewayException extends Exception {

332

public SqlGatewayException(String message);

333

public SqlGatewayException(String message, Throwable cause);

334

}

335

336

// Result Enums

337

public enum FetchOrientation {

338

FETCH_NEXT, FETCH_PRIOR

339

}

340

341

public enum ResultType {

342

NOT_READY, PAYLOAD, EOS

343

}

344

```