or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-options.mdcore-service-interface.mdendpoint-framework.mdindex.mdoperation-management.mdrest-implementation.mdresult-data-models.mdsession-management.mdworkflow-management.md

operation-management.mddocs/

0

# Operation Management

1

2

Operation management provides asynchronous execution of SQL statements and operations with comprehensive status tracking, cancellation support, and resource management. Operations are tracked throughout their lifecycle from submission to completion.

3

4

## Capabilities

5

6

### OperationHandle

7

8

Unique identifier for operations using UUID-based handles.

9

10

```java { .api }

11

/**

12

* Operation Handle that identifies a unique operation

13

*/

14

public class OperationHandle {

15

private final UUID identifier;

16

17

/**

18

* Create a new operation handle with random UUID

19

* @return New OperationHandle instance

20

*/

21

public static OperationHandle create();

22

23

/**

24

* Create operation handle with specific UUID

25

* @param identifier UUID to use for the operation

26

*/

27

public OperationHandle(UUID identifier);

28

29

/**

30

* Get the UUID identifier for this operation

31

* @return UUID identifier

32

*/

33

public UUID getIdentifier();

34

35

@Override

36

public boolean equals(Object o);

37

38

@Override

39

public int hashCode();

40

41

@Override

42

public String toString();

43

}

44

```

45

46

### OperationStatus

47

48

Enumeration representing the complete lifecycle of operations with terminal status checking and transition validation.

49

50

```java { .api }

51

/**

52

* Enumeration of operation states throughout lifecycle

53

*/

54

public enum OperationStatus {

55

/** Newly created operation, not yet started */

56

INITIALIZED,

57

58

/** Preparing resources for execution */

59

PENDING,

60

61

/** Operation currently executing */

62

RUNNING,

63

64

/** Completed successfully with results available */

65

FINISHED,

66

67

/** Operation was cancelled by user or system */

68

CANCELED,

69

70

/** Resources cleaned up, operation no longer accessible */

71

CLOSED,

72

73

/** Error occurred during execution */

74

ERROR,

75

76

/** Execution timed out */

77

TIMEOUT;

78

79

/**

80

* Check if this status represents a terminal state

81

* @return true if operation cannot transition to other states

82

*/

83

public boolean isTerminalStatus();

84

85

/**

86

* Validate if transition from one status to another is allowed

87

* @param from Source status

88

* @param to Target status

89

* @return true if transition is valid

90

*/

91

public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);

92

}

93

```

94

95

### OperationInfo

96

97

Information about operation status and any exceptions that occurred during execution.

98

99

```java { .api }

100

/**

101

* Status and error information for operations

102

*/

103

public class OperationInfo {

104

/**

105

* Get current operation status

106

* @return OperationStatus representing current state

107

*/

108

public OperationStatus getStatus();

109

110

/**

111

* Get exception information if operation failed

112

* @return Optional exception message and details

113

*/

114

public Optional<String> getException();

115

116

/**

117

* Create OperationInfo with status

118

* @param status Current operation status

119

* @return OperationInfo instance

120

*/

121

public static OperationInfo of(OperationStatus status);

122

123

/**

124

* Create OperationInfo with status and exception

125

* @param status Current operation status

126

* @param exception Exception that occurred

127

* @return OperationInfo instance

128

*/

129

public static OperationInfo of(OperationStatus status, String exception);

130

}

131

```

132

133

### OperationManager

134

135

Manages operation lifecycle, execution, and resource cleanup.

136

137

```java { .api }

138

/**

139

* Manages operation lifecycle and execution

140

*/

141

public class OperationManager {

142

/**

143

* Submit operation for execution

144

* @param sessionHandle Session for the operation

145

* @param executor Callable that produces results

146

* @return OperationHandle for tracking

147

*/

148

public OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor);

149

150

/**

151

* Cancel running operation

152

* @param sessionHandle Session handle

153

* @param operationHandle Operation to cancel

154

*/

155

public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle);

156

157

/**

158

* Close operation and clean up resources

159

* @param sessionHandle Session handle

160

* @param operationHandle Operation to close

161

*/

162

public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle);

163

164

/**

165

* Get operation information

166

* @param sessionHandle Session handle

167

* @param operationHandle Operation handle

168

* @return OperationInfo with current status

169

*/

170

public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle);

171

172

/**

173

* Get operation result schema when available

174

* @param sessionHandle Session handle

175

* @param operationHandle Operation handle

176

* @return ResolvedSchema of operation results

177

*/

178

public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle);

179

}

180

```

181

182

### OperationExecutor

183

184

Executes operations in background threads with timeout and cancellation support.

185

186

```java { .api }

187

/**

188

* Executes operations in background threads

189

*/

190

public class OperationExecutor {

191

/**

192

* Start the executor with configured thread pool

193

*/

194

public void start();

195

196

/**

197

* Stop the executor and shutdown thread pool

198

*/

199

public void stop();

200

201

/**

202

* Submit operation for asynchronous execution

203

* @param operation Operation to execute

204

* @return Future representing the execution

205

*/

206

public Future<ResultSet> submitOperation(Operation operation);

207

208

/**

209

* Get number of active operations

210

* @return Count of currently executing operations

211

*/

212

public int getActiveOperationCount();

213

}

214

```

215

216

## Usage Examples

217

218

### Basic Operation Submission

219

220

```java

221

import org.apache.flink.table.gateway.api.operation.OperationHandle;

222

import org.apache.flink.table.gateway.api.operation.OperationStatus;

223

import java.util.concurrent.Callable;

224

225

// Submit SQL statement as operation

226

OperationHandle operation = service.executeStatement(

227

sessionHandle,

228

"SELECT COUNT(*) FROM orders WHERE status = 'COMPLETED'",

229

30000L, // 30 second timeout

230

new Configuration()

231

);

232

233

// Check operation status

234

OperationInfo info = service.getOperationInfo(sessionHandle, operation);

235

System.out.println("Operation status: " + info.getStatus());

236

237

// Wait for completion

238

while (!info.getStatus().isTerminalStatus()) {

239

Thread.sleep(1000);

240

info = service.getOperationInfo(sessionHandle, operation);

241

}

242

243

if (info.getStatus() == OperationStatus.FINISHED) {

244

// Fetch results

245

ResultSet results = service.fetchResults(sessionHandle, operation, 0L, 100);

246

// Process results...

247

} else if (info.getStatus() == OperationStatus.ERROR) {

248

System.err.println("Operation failed: " + info.getException().orElse("Unknown error"));

249

}

250

251

// Clean up

252

service.closeOperation(sessionHandle, operation);

253

```

254

255

### Custom Operation Submission

256

257

```java

258

import java.util.concurrent.Callable;

259

260

// Submit custom operation with Callable

261

Callable<ResultSet> customLogic = () -> {

262

// Custom processing logic

263

List<RowData> data = processData();

264

ResolvedSchema schema = createSchema();

265

return ResultSet.builder()

266

.resultType(ResultType.PAYLOAD)

267

.data(data)

268

.resultSchema(schema)

269

.build();

270

};

271

272

OperationHandle customOp = service.submitOperation(sessionHandle, customLogic);

273

274

// Monitor execution

275

OperationInfo info = service.getOperationInfo(sessionHandle, customOp);

276

while (info.getStatus() == OperationStatus.RUNNING) {

277

Thread.sleep(500);

278

info = service.getOperationInfo(sessionHandle, customOp);

279

}

280

```

281

282

### Operation Cancellation

283

284

```java

285

// Start long-running operation

286

OperationHandle longOp = service.executeStatement(

287

sessionHandle,

288

"SELECT * FROM large_table ORDER BY timestamp",

289

0L, // No timeout

290

new Configuration()

291

);

292

293

// Check if operation is running

294

OperationInfo info = service.getOperationInfo(sessionHandle, longOp);

295

if (info.getStatus() == OperationStatus.RUNNING) {

296

// Cancel the operation

297

service.cancelOperation(sessionHandle, longOp);

298

299

// Verify cancellation

300

info = service.getOperationInfo(sessionHandle, longOp);

301

System.out.println("Operation status after cancel: " + info.getStatus());

302

}

303

304

// Clean up

305

service.closeOperation(sessionHandle, longOp);

306

```

307

308

### Batch Operation Management

309

310

```java

311

import java.util.List;

312

import java.util.Map;

313

import java.util.concurrent.CompletableFuture;

314

315

// Submit multiple operations

316

List<String> queries = List.of(

317

"SELECT COUNT(*) FROM users",

318

"SELECT AVG(price) FROM products",

319

"SELECT MAX(order_date) FROM orders"

320

);

321

322

Map<String, OperationHandle> operations = new HashMap<>();

323

for (int i = 0; i < queries.size(); i++) {

324

String query = queries.get(i);

325

OperationHandle op = service.executeStatement(sessionHandle, query, 60000L, new Configuration());

326

operations.put("query_" + i, op);

327

}

328

329

// Monitor all operations

330

Map<String, OperationInfo> results = new HashMap<>();

331

while (operations.size() > results.size()) {

332

for (Map.Entry<String, OperationHandle> entry : operations.entrySet()) {

333

String key = entry.getKey();

334

OperationHandle op = entry.getValue();

335

336

if (!results.containsKey(key)) {

337

OperationInfo info = service.getOperationInfo(sessionHandle, op);

338

if (info.getStatus().isTerminalStatus()) {

339

results.put(key, info);

340

System.out.println(key + " completed with status: " + info.getStatus());

341

342

if (info.getStatus() == OperationStatus.FINISHED) {

343

ResultSet resultSet = service.fetchResults(sessionHandle, op, 0L, 10);

344

// Process results...

345

}

346

347

// Clean up completed operation

348

service.closeOperation(sessionHandle, op);

349

}

350

}

351

}

352

Thread.sleep(1000);

353

}

354

```

355

356

### Operation Status Monitoring

357

358

```java

359

// Advanced status monitoring with timeout

360

public class OperationMonitor {

361

private final SqlGatewayService service;

362

363

public OperationResult waitForCompletion(

364

SessionHandle session,

365

OperationHandle operation,

366

long timeoutMs) {

367

368

long startTime = System.currentTimeMillis();

369

long endTime = startTime + timeoutMs;

370

371

while (System.currentTimeMillis() < endTime) {

372

OperationInfo info = service.getOperationInfo(session, operation);

373

374

switch (info.getStatus()) {

375

case FINISHED:

376

ResultSet results = service.fetchResults(session, operation, 0L, Integer.MAX_VALUE);

377

return OperationResult.success(results);

378

379

case ERROR:

380

return OperationResult.error(info.getException().orElse("Unknown error"));

381

382

case CANCELED:

383

return OperationResult.cancelled();

384

385

case TIMEOUT:

386

return OperationResult.timeout();

387

388

case RUNNING:

389

case PENDING:

390

case INITIALIZED:

391

// Continue waiting

392

try {

393

Thread.sleep(100);

394

} catch (InterruptedException e) {

395

Thread.currentThread().interrupt();

396

return OperationResult.interrupted();

397

}

398

break;

399

400

case CLOSED:

401

return OperationResult.error("Operation was closed");

402

}

403

}

404

405

// Timeout reached

406

service.cancelOperation(session, operation);

407

return OperationResult.timeout();

408

}

409

}

410

```

411

412

### Error Handling and Recovery

413

414

```java

415

// Robust operation execution with error handling

416

public ResultSet executeWithRetry(String sql, int maxRetries) {

417

for (int attempt = 1; attempt <= maxRetries; attempt++) {

418

OperationHandle operation = null;

419

try {

420

operation = service.executeStatement(

421

sessionHandle,

422

sql,

423

30000L,

424

new Configuration()

425

);

426

427

// Wait for completion

428

OperationInfo info;

429

do {

430

Thread.sleep(1000);

431

info = service.getOperationInfo(sessionHandle, operation);

432

} while (!info.getStatus().isTerminalStatus());

433

434

if (info.getStatus() == OperationStatus.FINISHED) {

435

return service.fetchResults(sessionHandle, operation, 0L, Integer.MAX_VALUE);

436

} else if (info.getStatus() == OperationStatus.ERROR) {

437

String error = info.getException().orElse("Unknown error");

438

if (attempt < maxRetries && isRetryableError(error)) {

439

System.out.println("Attempt " + attempt + " failed, retrying: " + error);

440

continue;

441

} else {

442

throw new RuntimeException("Operation failed: " + error);

443

}

444

} else {

445

throw new RuntimeException("Operation ended with status: " + info.getStatus());

446

}

447

448

} catch (Exception e) {

449

if (attempt == maxRetries) {

450

throw new RuntimeException("All retry attempts failed", e);

451

}

452

System.out.println("Attempt " + attempt + " failed with exception, retrying: " + e.getMessage());

453

} finally {

454

if (operation != null) {

455

try {

456

service.closeOperation(sessionHandle, operation);

457

} catch (Exception e) {

458

System.err.println("Failed to close operation: " + e.getMessage());

459

}

460

}

461

}

462

}

463

throw new RuntimeException("Should not reach here");

464

}

465

466

private boolean isRetryableError(String error) {

467

return error.contains("timeout") ||

468

error.contains("connection") ||

469

error.contains("temporary");

470

}

471

```