or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-options.mdcore-service-interface.mdendpoint-framework.mdindex.mdoperation-management.mdrest-implementation.mdresult-data-models.mdsession-management.mdworkflow-management.md

rest-implementation.mddocs/

0

# REST Implementation

1

2

The REST implementation provides a comprehensive HTTP API for all SQL Gateway operations including session management, statement execution, operation control, and catalog browsing. The REST endpoint supports JSON and plain text response formats with full error handling.

3

4

## Capabilities

5

6

### REST Endpoint URLs

7

8

Complete set of REST endpoints for SQL Gateway operations:

9

10

```java { .api }

11

// Session Management

12

POST /sessions // Open session

13

DELETE /sessions/{sessionId} // Close session

14

POST /sessions/{sessionId}/configure-session // Configure session

15

GET /sessions/{sessionId}/config // Get session config

16

POST /sessions/{sessionId}/heartbeat // Session heartbeat

17

18

// Statement Execution

19

POST /sessions/{sessionId}/statements // Execute statement

20

POST /sessions/{sessionId}/complete-statement // Statement completion

21

22

// Operation Management

23

GET /sessions/{sessionId}/operations/{operationId}/status // Get status

24

DELETE /sessions/{sessionId}/operations/{operationId}/cancel // Cancel operation

25

DELETE /sessions/{sessionId}/operations/{operationId} // Close operation

26

27

// Result Fetching

28

GET /sessions/{sessionId}/operations/{operationId}/result/{token} // Fetch results

29

30

// Utility

31

GET /info // Get gateway info

32

GET /api_version // Get API version

33

34

// Materialized Table

35

POST /materialized-tables/{materializedTableId}/refresh // Refresh table

36

37

// Application Deployment

38

POST /scripts // Deploy script

39

```

40

41

### SqlGatewayRestEndpoint

42

43

Main REST endpoint implementation extending Flink's RestServerEndpoint.

44

45

```java { .api }

46

/**

47

* REST endpoint implementation for SQL Gateway

48

*/

49

public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {

50

/**

51

* Create REST endpoint with service and configuration

52

* @param sqlGatewayService Service implementation

53

* @param configuration Flink configuration

54

*/

55

public SqlGatewayRestEndpoint(SqlGatewayService sqlGatewayService, Configuration configuration);

56

57

@Override

58

public void start() throws Exception;

59

60

@Override

61

public void stop() throws Exception;

62

63

/**

64

* Get bound port of the REST server

65

* @return Port number the server is listening on

66

*/

67

public int getPort();

68

69

/**

70

* Get REST server address

71

* @return Server address string

72

*/

73

public String getRestAddress();

74

}

75

```

76

77

### RowFormat

78

79

Enumeration for result serialization formats.

80

81

```java { .api }

82

/**

83

* Row serialization format enumeration

84

*/

85

public enum RowFormat {

86

/** JSON format with type information */

87

JSON,

88

89

/** SQL-compliant plain text format */

90

PLAIN_TEXT;

91

92

/**

93

* Get default row format

94

* @return Default format (JSON)

95

*/

96

public static RowFormat getDefaultFormat();

97

98

/**

99

* Parse row format from string

100

* @param format Format string

101

* @return RowFormat enum value

102

* @throws IllegalArgumentException if format not recognized

103

*/

104

public static RowFormat fromString(String format);

105

}

106

```

107

108

### REST Message Types

109

110

#### Session Messages

111

112

```java { .api }

113

/**

114

* Request body for opening sessions

115

*/

116

public class OpenSessionRequestBody implements RequestBody {

117

/**

118

* Get session name

119

* @return Optional session name

120

*/

121

public Optional<String> getSessionName();

122

123

/**

124

* Get session properties

125

* @return Map of session configuration

126

*/

127

public Map<String, String> getProperties();

128

}

129

130

/**

131

* Response body for session opening

132

*/

133

public class OpenSessionResponseBody implements ResponseBody {

134

/**

135

* Get session handle

136

* @return SessionHandle for the new session

137

*/

138

public String getSessionHandle();

139

}

140

141

/**

142

* Request body for session configuration

143

*/

144

public class ConfigureSessionRequestBody implements RequestBody {

145

/**

146

* Get SQL statement for configuration

147

* @return SQL statement string

148

*/

149

public String getStatement();

150

151

/**

152

* Get execution timeout

153

* @return Timeout in milliseconds

154

*/

155

public Long getExecutionTimeoutMs();

156

}

157

158

/**

159

* Response body for session configuration

160

*/

161

public class GetSessionConfigResponseBody implements ResponseBody {

162

/**

163

* Get session properties

164

* @return Map of current session configuration

165

*/

166

public Map<String, String> getProperties();

167

}

168

```

169

170

#### Statement Messages

171

172

```java { .api }

173

/**

174

* Request body for statement execution

175

*/

176

public class ExecuteStatementRequestBody implements RequestBody {

177

/**

178

* Get SQL statement to execute

179

* @return SQL statement string

180

*/

181

public String getStatement();

182

183

/**

184

* Get execution timeout

185

* @return Optional execution timeout in milliseconds

186

*/

187

public Optional<Long> getExecutionTimeoutMs();

188

189

/**

190

* Get execution configuration

191

* @return Map of execution properties

192

*/

193

public Map<String, String> getExecutionConfig();

194

}

195

196

/**

197

* Response body for statement execution

198

*/

199

public class ExecuteStatementResponseBody implements ResponseBody {

200

/**

201

* Get operation handle

202

* @return String representation of operation handle

203

*/

204

public String getOperationHandle();

205

}

206

207

/**

208

* Response body for result fetching

209

*/

210

public interface FetchResultsResponseBody extends ResponseBody {

211

/**

212

* Get result type

213

* @return ResultType (NOT_READY, PAYLOAD, EOS)

214

*/

215

ResultType getResultType();

216

217

/**

218

* Get next token for pagination

219

* @return Optional next token

220

*/

221

Optional<Long> getNextToken();

222

223

/**

224

* Get results data

225

* @return Results in requested format

226

*/

227

Object getResults();

228

}

229

230

/**

231

* Request body for statement completion

232

*/

233

public class CompleteStatementRequestBody implements RequestBody {

234

/**

235

* Get statement to complete

236

* @return SQL statement string

237

*/

238

public String getStatement();

239

240

/**

241

* Get cursor position

242

* @return Position in statement for completion

243

*/

244

public Integer getPosition();

245

}

246

247

/**

248

* Response body for statement completion

249

*/

250

public class CompleteStatementResponseBody implements ResponseBody {

251

/**

252

* Get completion candidates

253

* @return List of completion suggestions

254

*/

255

public List<String> getCandidates();

256

}

257

```

258

259

#### Operation Messages

260

261

```java { .api }

262

/**

263

* Response body for operation status

264

*/

265

public class OperationStatusResponseBody implements ResponseBody {

266

/**

267

* Get operation status

268

* @return Current OperationStatus

269

*/

270

public OperationStatus getStatus();

271

}

272

```

273

274

#### Utility Messages

275

276

```java { .api }

277

/**

278

* Response body for gateway information

279

*/

280

public class GetInfoResponseBody implements ResponseBody {

281

/**

282

* Get product name

283

* @return Product name ("Apache Flink")

284

*/

285

public String getProductName();

286

287

/**

288

* Get version

289

* @return Flink version string

290

*/

291

public String getVersion();

292

}

293

294

/**

295

* Response body for API version

296

*/

297

public class GetApiVersionResponseBody implements ResponseBody {

298

/**

299

* Get API versions

300

* @return List of supported API versions

301

*/

302

public List<String> getVersions();

303

}

304

```

305

306

### Path Parameters

307

308

```java { .api }

309

/**

310

* Path parameter for session handle

311

*/

312

public class SessionHandleIdPathParameter extends MessagePathParameter<String> {

313

public static final String KEY = "sessionHandle";

314

}

315

316

/**

317

* Path parameter for operation handle

318

*/

319

public class OperationHandleIdPathParameter extends MessagePathParameter<String> {

320

public static final String KEY = "operationHandle";

321

}

322

323

/**

324

* Path parameter for result token

325

*/

326

public class FetchResultsTokenPathParameter extends MessagePathParameter<Long> {

327

public static final String KEY = "token";

328

}

329

```

330

331

### Query Parameters

332

333

```java { .api }

334

/**

335

* Query parameter for row format

336

*/

337

public class FetchResultsRowFormatQueryParameter extends MessageQueryParameter<RowFormat> {

338

public static final String KEY = "rowFormat";

339

340

/**

341

* Get default row format

342

* @return Default format (JSON)

343

*/

344

@Override

345

public RowFormat getDefaultValue();

346

}

347

```

348

349

## Usage Examples

350

351

### HTTP Client Usage

352

353

```bash

354

# Open session

355

curl -X POST http://localhost:8083/sessions \

356

-H "Content-Type: application/json" \

357

-d '{

358

"sessionName": "my-session",

359

"properties": {

360

"execution.target": "remote",

361

"parallelism.default": "4"

362

}

363

}'

364

365

# Response: {"sessionHandle": "550e8400-e29b-41d4-a716-446655440000"}

366

367

# Execute statement

368

curl -X POST http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/statements \

369

-H "Content-Type: application/json" \

370

-d '{

371

"statement": "SELECT COUNT(*) FROM users",

372

"executionTimeoutMs": 30000

373

}'

374

375

# Response: {"operationHandle": "660e8400-e29b-41d4-a716-446655440001"}

376

377

# Check operation status

378

curl -X GET http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001/status

379

380

# Response: {"status": "RUNNING"}

381

382

# Fetch results (when FINISHED)

383

curl -X GET "http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001/result/0?rowFormat=JSON"

384

385

# Close operation

386

curl -X DELETE http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001

387

388

# Close session

389

curl -X DELETE http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000

390

```

391

392

### Java HTTP Client

393

394

```java

395

import java.net.http.HttpClient;

396

import java.net.http.HttpRequest;

397

import java.net.http.HttpResponse;

398

import java.net.URI;

399

import com.fasterxml.jackson.databind.ObjectMapper;

400

401

public class SqlGatewayRestClient {

402

private final HttpClient httpClient;

403

private final ObjectMapper objectMapper;

404

private final String baseUrl;

405

406

public SqlGatewayRestClient(String baseUrl) {

407

this.httpClient = HttpClient.newHttpClient();

408

this.objectMapper = new ObjectMapper();

409

this.baseUrl = baseUrl;

410

}

411

412

public String openSession(String sessionName, Map<String, String> properties) throws Exception {

413

Map<String, Object> requestBody = Map.of(

414

"sessionName", sessionName,

415

"properties", properties

416

);

417

418

HttpRequest request = HttpRequest.newBuilder()

419

.uri(URI.create(baseUrl + "/sessions"))

420

.header("Content-Type", "application/json")

421

.POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(requestBody)))

422

.build();

423

424

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

425

426

if (response.statusCode() == 200) {

427

Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);

428

return responseBody.get("sessionHandle");

429

} else {

430

throw new RuntimeException("Failed to open session: " + response.body());

431

}

432

}

433

434

public String executeStatement(String sessionHandle, String statement, Long timeoutMs) throws Exception {

435

Map<String, Object> requestBody = Map.of(

436

"statement", statement,

437

"executionTimeoutMs", timeoutMs != null ? timeoutMs : 30000L

438

);

439

440

HttpRequest request = HttpRequest.newBuilder()

441

.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/statements"))

442

.header("Content-Type", "application/json")

443

.POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(requestBody)))

444

.build();

445

446

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

447

448

if (response.statusCode() == 200) {

449

Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);

450

return responseBody.get("operationHandle");

451

} else {

452

throw new RuntimeException("Failed to execute statement: " + response.body());

453

}

454

}

455

456

public String getOperationStatus(String sessionHandle, String operationHandle) throws Exception {

457

HttpRequest request = HttpRequest.newBuilder()

458

.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/operations/" + operationHandle + "/status"))

459

.GET()

460

.build();

461

462

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

463

464

if (response.statusCode() == 200) {

465

Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);

466

return responseBody.get("status");

467

} else {

468

throw new RuntimeException("Failed to get operation status: " + response.body());

469

}

470

}

471

472

public Map<String, Object> fetchResults(String sessionHandle, String operationHandle, long token, RowFormat format) throws Exception {

473

String url = String.format("%s/sessions/%s/operations/%s/result/%d?rowFormat=%s",

474

baseUrl, sessionHandle, operationHandle, token, format.name());

475

476

HttpRequest request = HttpRequest.newBuilder()

477

.uri(URI.create(url))

478

.GET()

479

.build();

480

481

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

482

483

if (response.statusCode() == 200) {

484

return objectMapper.readValue(response.body(), Map.class);

485

} else {

486

throw new RuntimeException("Failed to fetch results: " + response.body());

487

}

488

}

489

490

public void closeOperation(String sessionHandle, String operationHandle) throws Exception {

491

HttpRequest request = HttpRequest.newBuilder()

492

.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/operations/" + operationHandle))

493

.DELETE()

494

.build();

495

496

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

497

498

if (response.statusCode() != 200) {

499

throw new RuntimeException("Failed to close operation: " + response.body());

500

}

501

}

502

503

public void closeSession(String sessionHandle) throws Exception {

504

HttpRequest request = HttpRequest.newBuilder()

505

.uri(URI.create(baseUrl + "/sessions/" + sessionHandle))

506

.DELETE()

507

.build();

508

509

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

510

511

if (response.statusCode() != 200) {

512

throw new RuntimeException("Failed to close session: " + response.body());

513

}

514

}

515

}

516

```

517

518

### Complete REST Workflow

519

520

```java

521

// Complete workflow using REST client

522

public class SqlGatewayRestExample {

523

public static void main(String[] args) throws Exception {

524

SqlGatewayRestClient client = new SqlGatewayRestClient("http://localhost:8083");

525

526

// Open session

527

String sessionHandle = client.openSession("example-session", Map.of(

528

"execution.target", "remote",

529

"parallelism.default", "2"

530

));

531

System.out.println("Opened session: " + sessionHandle);

532

533

try {

534

// Execute query

535

String operationHandle = client.executeStatement(

536

sessionHandle,

537

"SELECT id, name, COUNT(*) as cnt FROM users GROUP BY id, name",

538

30000L

539

);

540

System.out.println("Started operation: " + operationHandle);

541

542

// Wait for completion

543

String status;

544

do {

545

Thread.sleep(1000);

546

status = client.getOperationStatus(sessionHandle, operationHandle);

547

System.out.println("Operation status: " + status);

548

} while (!"FINISHED".equals(status) && !"ERROR".equals(status) && !"CANCELED".equals(status));

549

550

if ("FINISHED".equals(status)) {

551

// Fetch results

552

long token = 0L;

553

while (true) {

554

Map<String, Object> results = client.fetchResults(sessionHandle, operationHandle, token, RowFormat.JSON);

555

String resultType = (String) results.get("resultType");

556

557

if ("PAYLOAD".equals(resultType)) {

558

System.out.println("Results: " + results.get("results"));

559

560

Object nextTokenObj = results.get("nextToken");

561

if (nextTokenObj != null) {

562

token = ((Number) nextTokenObj).longValue();

563

} else {

564

break; // No more data

565

}

566

} else if ("EOS".equals(resultType)) {

567

System.out.println("End of results");

568

break;

569

} else {

570

System.out.println("Results not ready");

571

Thread.sleep(1000);

572

}

573

}

574

} else {

575

System.err.println("Operation failed with status: " + status);

576

}

577

578

// Clean up operation

579

client.closeOperation(sessionHandle, operationHandle);

580

581

} finally {

582

// Clean up session

583

client.closeSession(sessionHandle);

584

}

585

}

586

}

587

```

588

589

### Error Handling

590

591

```java

592

// Comprehensive error handling for REST operations

593

public class RestErrorHandler {

594

595

public void handleRestErrors(HttpResponse<String> response) throws Exception {

596

if (response.statusCode() >= 400) {

597

String errorBody = response.body();

598

599

switch (response.statusCode()) {

600

case 400:

601

throw new IllegalArgumentException("Bad request: " + errorBody);

602

case 401:

603

throw new SecurityException("Unauthorized: " + errorBody);

604

case 404:

605

throw new IllegalStateException("Resource not found: " + errorBody);

606

case 500:

607

throw new RuntimeException("Internal server error: " + errorBody);

608

case 503:

609

throw new RuntimeException("Service unavailable: " + errorBody);

610

default:

611

throw new RuntimeException("HTTP error " + response.statusCode() + ": " + errorBody);

612

}

613

}

614

}

615

616

public void handleOperationErrors(String status, Map<String, Object> operationInfo) {

617

switch (status) {

618

case "ERROR":

619

String exception = (String) operationInfo.get("exception");

620

throw new RuntimeException("Operation failed: " + exception);

621

case "TIMEOUT":

622

throw new RuntimeException("Operation timed out");

623

case "CANCELED":

624

throw new RuntimeException("Operation was canceled");

625

default:

626

// Handle other statuses as needed

627

}

628

}

629

}

630

```