or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddata-types.mddatastream-bridge.mdexpressions.mdfunctions.mdindex.mdsql-gateway.mdtable-operations.md

sql-gateway.mddocs/

0

# SQL Gateway API

1

2

Remote SQL execution capabilities for building client-server architectures and multi-tenant SQL services with Apache Flink's Table API.

3

4

## Capabilities

5

6

### SqlGatewayService

7

8

Core service interface for managing SQL sessions and executing SQL operations remotely.

9

10

```java { .api }

11

/**

12

* Open a new session for SQL execution

13

* @param environment Session configuration and initialization parameters

14

* @return SessionHandle for identifying the session

15

* @throws SqlGatewayException if session creation fails

16

*/

17

public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;

18

19

/**

20

* Close an existing session and clean up resources

21

* @param sessionHandle Handle of the session to close

22

* @throws SqlGatewayException if session closure fails

23

*/

24

public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;

25

26

/**

27

* Configure session properties using SET statements

28

* @param sessionHandle Handle of the session to configure

29

* @param statement Configuration statement (e.g., "SET table.planner = blink")

30

* @param executionTimeoutMs Maximum execution time in milliseconds

31

* @throws SqlGatewayException if configuration fails

32

*/

33

public void configureSession(SessionHandle sessionHandle,

34

String statement,

35

long executionTimeoutMs) throws SqlGatewayException;

36

37

/**

38

* Execute a SQL statement (DDL, DML, or query)

39

* @param sessionHandle Handle of the session

40

* @param statement SQL statement to execute

41

* @param executionTimeoutMs Maximum execution time in milliseconds

42

* @param executionConfig Additional execution configuration

43

* @return OperationHandle for tracking the operation

44

* @throws SqlGatewayException if execution fails

45

*/

46

public OperationHandle executeStatement(SessionHandle sessionHandle,

47

String statement,

48

long executionTimeoutMs,

49

Configuration executionConfig) throws SqlGatewayException;

50

51

/**

52

* Submit a custom operation for execution

53

* @param sessionHandle Handle of the session

54

* @param executor Callable that produces the operation result

55

* @return OperationHandle for tracking the operation

56

* @throws SqlGatewayException if submission fails

57

*/

58

public OperationHandle submitOperation(SessionHandle sessionHandle,

59

Callable<ResultSet> executor) throws SqlGatewayException;

60

61

/**

62

* Cancel a running operation

63

* @param sessionHandle Handle of the session

64

* @param operationHandle Handle of the operation to cancel

65

* @throws SqlGatewayException if cancellation fails

66

*/

67

public void cancelOperation(SessionHandle sessionHandle,

68

OperationHandle operationHandle) throws SqlGatewayException;

69

70

/**

71

* Close an operation and clean up resources

72

* @param sessionHandle Handle of the session

73

* @param operationHandle Handle of the operation to close

74

* @throws SqlGatewayException if operation closure fails

75

*/

76

public void closeOperation(SessionHandle sessionHandle,

77

OperationHandle operationHandle) throws SqlGatewayException;

78

79

/**

80

* Get information about an operation

81

* @param sessionHandle Handle of the session

82

* @param operationHandle Handle of the operation

83

* @return OperationInfo containing status and metadata

84

* @throws SqlGatewayException if operation info retrieval fails

85

*/

86

public OperationInfo getOperationInfo(SessionHandle sessionHandle,

87

OperationHandle operationHandle) throws SqlGatewayException;

88

89

/**

90

* Get schema information for an operation result

91

* @param sessionHandle Handle of the session

92

* @param operationHandle Handle of the operation

93

* @return ResolvedSchema of the operation result

94

* @throws SqlGatewayException if schema retrieval fails

95

*/

96

public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle,

97

OperationHandle operationHandle) throws SqlGatewayException;

98

99

/**

100

* Fetch results from an operation

101

* @param sessionHandle Handle of the session

102

* @param operationHandle Handle of the operation

103

* @param orientation Direction for fetching results

104

* @param maxRows Maximum number of rows to fetch

105

* @return ResultSet containing the fetched results

106

* @throws SqlGatewayException if result fetching fails

107

*/

108

public ResultSet fetchResults(SessionHandle sessionHandle,

109

OperationHandle operationHandle,

110

FetchOrientation orientation,

111

int maxRows) throws SqlGatewayException;

112

```

113

114

**Basic SQL Gateway Usage:**

115

116

```java

117

// Create SQL Gateway service instance

118

SqlGatewayService gateway = SqlGatewayServiceImpl.create(gatewayConfig);

119

120

// Open session with configuration

121

Map<String, String> sessionProperties = new HashMap<>();

122

sessionProperties.put("execution.parallelism", "4");

123

sessionProperties.put("table.planner", "blink");

124

125

SessionEnvironment sessionEnv = SessionEnvironment.newBuilder()

126

.setSessionEndpointConfig(endpointConfig)

127

.addSessionConfig(sessionProperties)

128

.build();

129

130

SessionHandle session = gateway.openSession(sessionEnv);

131

132

try {

133

// Configure session

134

gateway.configureSession(session, "SET execution.checkpointing.interval = 10s", 5000);

135

136

// Execute DDL

137

OperationHandle createTable = gateway.executeStatement(session,

138

"CREATE TABLE orders (" +

139

" id BIGINT," +

140

" customer_id BIGINT," +

141

" amount DECIMAL(10,2)" +

142

") WITH ('connector' = 'kafka', ...)",

143

30000, new Configuration());

144

145

// Execute query

146

OperationHandle query = gateway.executeStatement(session,

147

"SELECT customer_id, SUM(amount) as total " +

148

"FROM orders GROUP BY customer_id",

149

60000, new Configuration());

150

151

// Fetch results

152

ResultSet results = gateway.fetchResults(session, query,

153

FetchOrientation.FETCH_NEXT, 100);

154

155

while (results.hasNext()) {

156

RowData row = results.next();

157

System.out.println("Customer: " + row.getLong(0) + ", Total: " + row.getDecimal(1, 10, 2));

158

}

159

160

} finally {

161

gateway.closeSession(session);

162

}

163

```

164

165

### Session Management

166

167

Handle SQL Gateway sessions with proper lifecycle management.

168

169

```java { .api }

170

/**

171

* Session handle for identifying and tracking SQL sessions

172

*/

173

public final class SessionHandle {

174

/**

175

* Get unique session identifier

176

* @return UUID representing the session

177

*/

178

public UUID getIdentifier();

179

180

/**

181

* Get session creation timestamp

182

* @return Session creation time

183

*/

184

public Instant getCreationTime();

185

}

186

187

/**

188

* Session environment configuration

189

*/

190

public final class SessionEnvironment {

191

/**

192

* Create new session environment builder

193

* @return Builder for constructing session environment

194

*/

195

public static Builder newBuilder();

196

197

public static final class Builder {

198

/**

199

* Set session endpoint configuration

200

* @param config Endpoint-specific configuration

201

* @return Builder for method chaining

202

*/

203

public Builder setSessionEndpointConfig(Map<String, String> config);

204

205

/**

206

* Add session configuration properties

207

* @param config Map of configuration key-value pairs

208

* @return Builder for method chaining

209

*/

210

public Builder addSessionConfig(Map<String, String> config);

211

212

/**

213

* Set default catalog name

214

* @param catalogName Name of the default catalog

215

* @return Builder for method chaining

216

*/

217

public Builder setDefaultCatalog(String catalogName);

218

219

/**

220

* Set default database name

221

* @param databaseName Name of the default database

222

* @return Builder for method chaining

223

*/

224

public Builder setDefaultDatabase(String databaseName);

225

226

/**

227

* Build the session environment

228

* @return Constructed SessionEnvironment

229

*/

230

public SessionEnvironment build();

231

}

232

}

233

```

234

235

### Operation Management

236

237

Track and manage long-running SQL operations.

238

239

```java { .api }

240

/**

241

* Operation handle for identifying and tracking SQL operations

242

*/

243

public final class OperationHandle {

244

/**

245

* Get unique operation identifier

246

* @return UUID representing the operation

247

*/

248

public UUID getIdentifier();

249

250

/**

251

* Get operation type

252

* @return Type of the operation (EXECUTE_STATEMENT, etc.)

253

*/

254

public OperationType getOperationType();

255

}

256

257

/**

258

* Operation information and status

259

*/

260

public final class OperationInfo {

261

/**

262

* Get operation status

263

* @return Current status of the operation

264

*/

265

public OperationStatus getStatus();

266

267

/**

268

* Get operation exception if failed

269

* @return Optional exception that caused operation failure

270

*/

271

public Optional<Throwable> getException();

272

273

/**

274

* Check if operation has results available

275

* @return true if operation has results to fetch

276

*/

277

public boolean hasResults();

278

279

/**

280

* Get operation creation timestamp

281

* @return When the operation was created

282

*/

283

public Instant getCreateTime();

284

285

/**

286

* Get operation end timestamp

287

* @return When the operation completed (if finished)

288

*/

289

public Optional<Instant> getEndTime();

290

}

291

292

/**

293

* Operation status enumeration

294

*/

295

public enum OperationStatus {

296

INITIALIZED,

297

PENDING,

298

RUNNING,

299

FINISHED,

300

CANCELED,

301

FAILED,

302

TIMEOUT

303

}

304

305

/**

306

* Operation type enumeration

307

*/

308

public enum OperationType {

309

EXECUTE_STATEMENT,

310

SUBMIT_PLAN

311

}

312

```

313

314

### Result Management

315

316

Handle result sets and data fetching from SQL operations.

317

318

```java { .api }

319

/**

320

* Result set for accessing query results

321

*/

322

public interface ResultSet extends AutoCloseable {

323

/**

324

* Get the result type

325

* @return Type of results (PAYLOAD, NOT_READY, EOS)

326

*/

327

public ResultType getResultType();

328

329

/**

330

* Get next token for pagination

331

* @return Optional token for fetching next page of results

332

*/

333

public Optional<String> getNextToken();

334

335

/**

336

* Get result data as list of RowData

337

* @return List containing the fetched rows

338

*/

339

public List<RowData> getData();

340

341

/**

342

* Check if more results are available

343

* @return true if hasNext() would return more data

344

*/

345

public boolean hasNext();

346

347

/**

348

* Get next row of data

349

* @return Next RowData instance

350

*/

351

public RowData next();

352

353

/**

354

* Get result schema

355

* @return Schema of the result rows

356

*/

357

public ResolvedSchema getResultSchema();

358

}

359

360

/**

361

* Result type enumeration

362

*/

363

public enum ResultType {

364

PAYLOAD, // Contains actual data

365

NOT_READY, // Operation not ready yet

366

EOS // End of stream

367

}

368

369

/**

370

* Fetch orientation for result pagination

371

*/

372

public enum FetchOrientation {

373

FETCH_NEXT, // Fetch next rows

374

FETCH_PRIOR, // Fetch previous rows

375

FETCH_FIRST, // Fetch from beginning

376

FETCH_LAST // Fetch from end

377

}

378

```

379

380

**Result Handling Examples:**

381

382

```java

383

// Execute query and handle results

384

OperationHandle queryOp = gateway.executeStatement(session,

385

"SELECT * FROM large_table ORDER BY id",

386

120000, new Configuration());

387

388

// Wait for operation to complete

389

OperationInfo opInfo;

390

do {

391

Thread.sleep(1000);

392

opInfo = gateway.getOperationInfo(session, queryOp);

393

} while (opInfo.getStatus() == OperationStatus.RUNNING);

394

395

if (opInfo.getStatus() == OperationStatus.FINISHED) {

396

// Fetch results in batches

397

String nextToken = null;

398

int batchSize = 1000;

399

400

do {

401

ResultSet batch = gateway.fetchResults(session, queryOp,

402

FetchOrientation.FETCH_NEXT, batchSize);

403

404

if (batch.getResultType() == ResultType.PAYLOAD) {

405

List<RowData> rows = batch.getData();

406

for (RowData row : rows) {

407

processRow(row);

408

}

409

nextToken = batch.getNextToken().orElse(null);

410

}

411

} while (nextToken != null);

412

}

413

```

414

415

### Endpoint Management

416

417

Configure and manage SQL Gateway endpoints for different protocols.

418

419

```java { .api }

420

/**

421

* SQL Gateway endpoint interface

422

*/

423

public interface SqlGatewayEndpoint {

424

/**

425

* Start the endpoint and begin accepting connections

426

* @throws SqlGatewayException if startup fails

427

*/

428

public void start() throws SqlGatewayException;

429

430

/**

431

* Stop the endpoint and close all connections

432

* @throws SqlGatewayException if shutdown fails

433

*/

434

public void stop() throws SqlGatewayException;

435

436

/**

437

* Get endpoint information

438

* @return Information about this endpoint

439

*/

440

public EndpointInfo getInfo();

441

}

442

443

/**

444

* Endpoint information

445

*/

446

public final class EndpointInfo {

447

/**

448

* Get endpoint identifier

449

* @return Unique endpoint ID

450

*/

451

public String getEndpointId();

452

453

/**

454

* Get endpoint version

455

* @return Version string of the endpoint

456

*/

457

public String getEndpointVersion();

458

459

/**

460

* Get supported protocols

461

* @return Set of supported protocol names

462

*/

463

public Set<String> getSupportedProtocols();

464

}

465

```

466

467

### Gateway Configuration

468

469

Configure SQL Gateway service and endpoints.

470

471

```java { .api }

472

/**

473

* SQL Gateway configuration builder

474

*/

475

public class SqlGatewayConfig {

476

public static Builder newBuilder() {

477

return new Builder();

478

}

479

480

public static class Builder {

481

/**

482

* Set session timeout

483

* @param timeout Session timeout duration

484

* @return Builder for method chaining

485

*/

486

public Builder setSessionTimeout(Duration timeout);

487

488

/**

489

* Set maximum number of concurrent sessions

490

* @param maxSessions Maximum session count

491

* @return Builder for method chaining

492

*/

493

public Builder setMaxSessions(int maxSessions);

494

495

/**

496

* Set operation timeout

497

* @param timeout Default operation timeout

498

* @return Builder for method chaining

499

*/

500

public Builder setOperationTimeout(Duration timeout);

501

502

/**

503

* Set result fetch timeout

504

* @param timeout Result fetch timeout

505

* @return Builder for method chaining

506

*/

507

public Builder setResultFetchTimeout(Duration timeout);

508

509

/**

510

* Enable/disable session persistence

511

* @param persistent Whether to persist sessions

512

* @return Builder for method chaining

513

*/

514

public Builder setPersistentSessions(boolean persistent);

515

516

/**

517

* Build the configuration

518

* @return Constructed configuration

519

*/

520

public SqlGatewayConfig build();

521

}

522

}

523

```

524

525

### Error Handling

526

527

Handle SQL Gateway specific exceptions and error conditions.

528

529

```java { .api }

530

/**

531

* Base exception for SQL Gateway operations

532

*/

533

public class SqlGatewayException extends Exception {

534

/**

535

* Get error code

536

* @return Specific error code for the exception

537

*/

538

public String getErrorCode();

539

540

/**

541

* Get error details

542

* @return Additional error details and context

543

*/

544

public Map<String, String> getErrorDetails();

545

}

546

547

/**

548

* Exception for session-related errors

549

*/

550

public class SessionException extends SqlGatewayException {

551

/**

552

* Get session handle that caused the error

553

* @return Session handle associated with error

554

*/

555

public SessionHandle getSessionHandle();

556

}

557

558

/**

559

* Exception for operation-related errors

560

*/

561

public class OperationException extends SqlGatewayException {

562

/**

563

* Get operation handle that caused the error

564

* @return Operation handle associated with error

565

*/

566

public OperationHandle getOperationHandle();

567

}

568

```

569

570

### Advanced Usage Patterns

571

572

Common patterns for building applications with SQL Gateway.

573

574

```java { .api }

575

// Connection pooling for multiple clients

576

public class SqlGatewayConnectionPool {

577

private final SqlGatewayService gateway;

578

private final Queue<SessionHandle> availableSessions;

579

private final int maxSessions;

580

581

public SessionHandle acquireSession() throws SqlGatewayException {

582

SessionHandle session = availableSessions.poll();

583

if (session == null) {

584

if (activeSessions.size() < maxSessions) {

585

session = gateway.openSession(defaultSessionEnv);

586

} else {

587

throw new SqlGatewayException("No available sessions");

588

}

589

}

590

return session;

591

}

592

593

public void releaseSession(SessionHandle session) {

594

availableSessions.offer(session);

595

}

596

}

597

598

// Async query execution

599

public class AsyncQueryExecutor {

600

public CompletableFuture<List<RowData>> executeQueryAsync(

601

SessionHandle session, String query) {

602

603

return CompletableFuture.supplyAsync(() -> {

604

try {

605

OperationHandle op = gateway.executeStatement(session, query, 60000, new Configuration());

606

607

// Poll for completion

608

OperationInfo info;

609

do {

610

Thread.sleep(100);

611

info = gateway.getOperationInfo(session, op);

612

} while (info.getStatus() == OperationStatus.RUNNING);

613

614

if (info.getStatus() == OperationStatus.FINISHED) {

615

ResultSet results = gateway.fetchResults(session, op,

616

FetchOrientation.FETCH_NEXT, Integer.MAX_VALUE);

617

return results.getData();

618

} else {

619

throw new RuntimeException("Query failed: " + info.getException());

620

}

621

} catch (Exception e) {

622

throw new RuntimeException(e);

623

}

624

});

625

}

626

}

627

628

// Multi-tenant session management

629

public class MultiTenantGateway {

630

private final Map<String, SessionHandle> tenantSessions = new ConcurrentHashMap<>();

631

632

public SessionHandle getOrCreateTenantSession(String tenantId) throws SqlGatewayException {

633

return tenantSessions.computeIfAbsent(tenantId, id -> {

634

try {

635

SessionEnvironment env = SessionEnvironment.newBuilder()

636

.setDefaultCatalog("tenant_" + id)

637

.addSessionConfig(Map.of(

638

"execution.parallelism", "2",

639

"table.exec.resource.default-parallelism", "2"

640

))

641

.build();

642

return gateway.openSession(env);

643

} catch (SqlGatewayException e) {

644

throw new RuntimeException(e);

645

}

646

});

647

}

648

649

public void executeForTenant(String tenantId, String sql) throws SqlGatewayException {

650

SessionHandle session = getOrCreateTenantSession(tenantId);

651

gateway.executeStatement(session, sql, 30000, new Configuration());

652

}

653

}

654

```