or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-sql-gateway

A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-gateway@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-gateway@2.1.0

0

# Apache Flink SQL Gateway

1

2

Apache Flink SQL Gateway is a service component that provides a multi-client SQL execution interface for remote connections. It acts as a gateway service enabling concurrent SQL execution from multiple clients through REST and HiveServer2 endpoints, with session management, operation tracking, and comprehensive catalog integration.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-gateway

7

- **Package Type**: Maven

8

- **Group ID**: org.apache.flink

9

- **Artifact ID**: flink-sql-gateway

10

- **Language**: Java

11

- **Installation**: Add to Maven dependencies:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-sql-gateway</artifactId>

17

<version>2.1.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

// Main Gateway Classes

25

import org.apache.flink.table.gateway.SqlGateway;

26

import org.apache.flink.table.gateway.api.SqlGatewayService;

27

import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;

28

29

// Session Management

30

import org.apache.flink.table.gateway.api.session.SessionHandle;

31

import org.apache.flink.table.gateway.api.session.SessionEnvironment;

32

import org.apache.flink.table.gateway.service.session.SessionManager;

33

import org.apache.flink.table.gateway.service.context.DefaultContext;

34

35

// Operation Management

36

import org.apache.flink.table.gateway.api.operation.OperationHandle;

37

import org.apache.flink.table.gateway.api.operation.OperationStatus;

38

import org.apache.flink.table.gateway.api.results.OperationInfo;

39

40

// Results and Data

41

import org.apache.flink.table.gateway.api.results.ResultSet;

42

import org.apache.flink.table.gateway.api.results.TableInfo;

43

import org.apache.flink.table.gateway.api.results.FunctionInfo;

44

import org.apache.flink.table.gateway.api.results.FetchOrientation;

45

46

// Endpoints

47

import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;

48

import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;

49

import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;

50

51

// Configuration

52

import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;

53

import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;

54

55

// Workflow Management

56

import org.apache.flink.table.workflow.WorkflowScheduler;

57

import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;

58

import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;

59

```

60

61

## Basic Usage

62

63

```java

64

import org.apache.flink.configuration.Configuration;

65

import org.apache.flink.table.gateway.SqlGateway;

66

import org.apache.flink.table.gateway.service.session.SessionManager;

67

import org.apache.flink.table.gateway.service.context.DefaultContext;

68

import org.apache.flink.table.gateway.api.SqlGatewayService;

69

import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;

70

import org.apache.flink.table.gateway.api.session.SessionEnvironment;

71

import org.apache.flink.table.gateway.api.session.SessionHandle;

72

import org.apache.flink.table.gateway.api.operation.OperationHandle;

73

import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;

74

75

import java.util.Collections;

76

77

// Start SQL Gateway programmatically

78

Configuration config = new Configuration();

79

DefaultContext defaultContext = DefaultContext.load(config, Collections.emptyList(), true);

80

SessionManager sessionManager = SessionManager.create(defaultContext);

81

82

SqlGateway gateway = new SqlGateway(defaultContext.getFlinkConfig(), sessionManager);

83

gateway.start();

84

85

// Use the service directly for SQL operations

86

SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);

87

88

// Open a session

89

SessionEnvironment environment = SessionEnvironment.newBuilder()

90

.setSessionEndpointVersion(EndpointVersion.V1)

91

.build();

92

93

SessionHandle session = service.openSession(environment);

94

95

// Execute SQL

96

OperationHandle operation = service.executeStatement(

97

session,

98

"SELECT 1",

99

30000L, // 30 second timeout

100

new Configuration()

101

);

102

103

// Clean up

104

service.closeOperation(session, operation);

105

service.closeSession(session);

106

107

// Stop gateway when done

108

gateway.stop();

109

```

110

111

## Architecture

112

113

The Flink SQL Gateway is built around several key components:

114

115

- **Core Service Interface**: `SqlGatewayService` provides the main API for session, operation, and catalog management

116

- **Endpoint Framework**: Pluggable endpoint architecture with REST and HiveServer2 implementations

117

- **Session Management**: Multi-client session handling with isolation and configuration

118

- **Operation Management**: Asynchronous SQL operation execution with status tracking and result pagination

119

- **REST Implementation**: Complete REST API with comprehensive endpoints for all operations

120

- **Workflow Management**: Materialized table scheduling integration with Quartz scheduler

121

- **Result Management**: Efficient result storage and fetching with token-based pagination

122

123

## Capabilities

124

125

### Core Service Interface

126

127

Main service interface providing session management, SQL execution, and catalog operations. The heart of the SQL Gateway system.

128

129

```java { .api }

130

public interface SqlGatewayService {

131

// Session Management

132

SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;

133

void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;

134

void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;

135

Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;

136

137

// Statement Execution

138

OperationHandle executeStatement(SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) throws SqlGatewayException;

139

ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) throws SqlGatewayException;

140

141

// Operation Management

142

OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException;

143

void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;

144

OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;

145

}

146

```

147

148

[Core Service Interface](./core-service-interface.md)

149

150

### Session Management

151

152

Session lifecycle management with environment configuration, handle creation, and session isolation for multi-client scenarios.

153

154

```java { .api }

155

public class SessionHandle {

156

public static SessionHandle create();

157

public UUID getIdentifier();

158

}

159

160

public class SessionEnvironment {

161

public static Builder newBuilder();

162

public Optional<String> getSessionName();

163

public Map<String, String> getSessionConfig();

164

public EndpointVersion getSessionEndpointVersion();

165

}

166

```

167

168

[Session Management](./session-management.md)

169

170

### Operation Management

171

172

Asynchronous operation execution with comprehensive status tracking, cancellation support, and resource management.

173

174

```java { .api }

175

public class OperationHandle {

176

public static OperationHandle create();

177

public UUID getIdentifier();

178

}

179

180

public enum OperationStatus {

181

INITIALIZED, PENDING, RUNNING, FINISHED, CANCELED, CLOSED, ERROR, TIMEOUT;

182

public boolean isTerminalStatus();

183

public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);

184

}

185

186

public class OperationInfo {

187

public OperationStatus getStatus();

188

public Optional<String> getException();

189

}

190

```

191

192

[Operation Management](./operation-management.md)

193

194

### Result Data Models

195

196

Rich result types with metadata, pagination support, and schema information for handling query results and operation outcomes.

197

198

```java { .api }

199

public interface ResultSet {

200

ResultType getResultType();

201

Long getNextToken();

202

ResolvedSchema getResultSchema();

203

List<RowData> getData();

204

boolean isQueryResult();

205

Optional<JobID> getJobID();

206

}

207

208

public class TableInfo {

209

public ObjectIdentifier getIdentifier();

210

public TableKind getTableKind();

211

}

212

213

public class FunctionInfo {

214

public UnresolvedIdentifier getIdentifier();

215

public Optional<FunctionKind> getKind();

216

}

217

```

218

219

[Result Data Models](./result-data-models.md)

220

221

### Endpoint Framework

222

223

Pluggable endpoint architecture enabling REST, HiveServer2, and custom endpoint implementations with SPI-based discovery.

224

225

```java { .api }

226

public interface SqlGatewayEndpoint {

227

void start() throws Exception;

228

void stop() throws Exception;

229

}

230

231

public interface SqlGatewayEndpointFactory {

232

SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);

233

234

interface Context {

235

SqlGatewayService getSqlGatewayService();

236

Configuration getFlinkConfiguration();

237

ConfigOption<?>[] getEndpointOptions();

238

}

239

}

240

```

241

242

[Endpoint Framework](./endpoint-framework.md)

243

244

### REST Implementation

245

246

Complete REST API implementation with comprehensive endpoints for session, statement, operation, and catalog management.

247

248

```java { .api }

249

public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {

250

// REST endpoints for:

251

// - Session: POST /sessions, DELETE /sessions/{sessionId}

252

// - Statement: POST /sessions/{sessionId}/statements

253

// - Operation: GET /sessions/{sessionId}/operations/{operationId}/status

254

// - Results: GET /sessions/{sessionId}/operations/{operationId}/result/{token}

255

// - Catalog: Integrated through statement execution

256

}

257

258

public enum RowFormat {

259

JSON, PLAIN_TEXT

260

}

261

```

262

263

[REST Implementation](./rest-implementation.md)

264

265

### Configuration Options

266

267

Comprehensive configuration system for service behavior, session management, worker threads, and REST endpoint settings.

268

269

```java { .api }

270

public class SqlGatewayServiceConfigOptions {

271

public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT;

272

public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM;

273

public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX;

274

public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;

275

}

276

277

public class SqlGatewayRestOptions {

278

public static final ConfigOption<String> ADDRESS;

279

public static final ConfigOption<String> BIND_ADDRESS;

280

public static final ConfigOption<Integer> PORT;

281

}

282

```

283

284

[Configuration Options](./configuration-options.md)

285

286

### Workflow Management

287

288

Materialized table scheduling system with Quartz integration for periodic refresh operations and workflow lifecycle management.

289

290

```java { .api }

291

public interface WorkflowScheduler<T extends RefreshHandler> {

292

void open() throws WorkflowException;

293

void close() throws WorkflowException;

294

RefreshHandlerSerializer<T> getRefreshHandlerSerializer();

295

T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;

296

void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;

297

void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;

298

}

299

300

public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {

301

public EmbeddedWorkflowScheduler(Configuration configuration);

302

}

303

304

public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {

305

public static final String IDENTIFIER = "embedded";

306

public WorkflowScheduler<?> createWorkflowScheduler(Context context);

307

}

308

```

309

310

[Workflow Management](./workflow-management.md)

311

312

## Types

313

314

### Core Types

315

316

```java { .api }

317

// Session and Operation Identifiers

318

public class SessionHandle {

319

private final UUID identifier;

320

public SessionHandle(UUID identifier);

321

public UUID getIdentifier();

322

}

323

324

public class OperationHandle {

325

private final UUID identifier;

326

public OperationHandle(UUID identifier);

327

public UUID getIdentifier();

328

}

329

330

// Exception Types

331

public class SqlGatewayException extends Exception {

332

public SqlGatewayException(String message);

333

public SqlGatewayException(String message, Throwable cause);

334

}

335

336

// Result Enums

337

public enum FetchOrientation {

338

FETCH_NEXT, FETCH_PRIOR

339

}

340

341

public enum ResultType {

342

NOT_READY, PAYLOAD, EOS

343

}

344

```