0
# Operation Management
1
2
Operation management provides asynchronous execution of SQL statements and operations with comprehensive status tracking, cancellation support, and resource management. Operations are tracked throughout their lifecycle from submission to completion.
3
4
## Capabilities
5
6
### OperationHandle
7
8
Unique identifier for operations using UUID-based handles.
9
10
```java { .api }
11
/**
12
* Operation Handle that identifies a unique operation
13
*/
14
public class OperationHandle {
15
private final UUID identifier;
16
17
/**
18
* Create a new operation handle with random UUID
19
* @return New OperationHandle instance
20
*/
21
public static OperationHandle create();
22
23
/**
24
* Create operation handle with specific UUID
25
* @param identifier UUID to use for the operation
26
*/
27
public OperationHandle(UUID identifier);
28
29
/**
30
* Get the UUID identifier for this operation
31
* @return UUID identifier
32
*/
33
public UUID getIdentifier();
34
35
@Override
36
public boolean equals(Object o);
37
38
@Override
39
public int hashCode();
40
41
@Override
42
public String toString();
43
}
44
```
45
46
### OperationStatus
47
48
Enumeration representing the complete lifecycle of operations with terminal status checking and transition validation.
49
50
```java { .api }
51
/**
52
* Enumeration of operation states throughout lifecycle
53
*/
54
public enum OperationStatus {
55
/** Newly created operation, not yet started */
56
INITIALIZED,
57
58
/** Preparing resources for execution */
59
PENDING,
60
61
/** Operation currently executing */
62
RUNNING,
63
64
/** Completed successfully with results available */
65
FINISHED,
66
67
/** Operation was cancelled by user or system */
68
CANCELED,
69
70
/** Resources cleaned up, operation no longer accessible */
71
CLOSED,
72
73
/** Error occurred during execution */
74
ERROR,
75
76
/** Execution timed out */
77
TIMEOUT;
78
79
/**
80
* Check if this status represents a terminal state
81
* @return true if operation cannot transition to other states
82
*/
83
public boolean isTerminalStatus();
84
85
/**
86
* Validate if transition from one status to another is allowed
87
* @param from Source status
88
* @param to Target status
89
* @return true if transition is valid
90
*/
91
public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);
92
}
93
```
94
95
### OperationInfo
96
97
Information about operation status and any exceptions that occurred during execution.
98
99
```java { .api }
100
/**
101
* Status and error information for operations
102
*/
103
public class OperationInfo {
104
/**
105
* Get current operation status
106
* @return OperationStatus representing current state
107
*/
108
public OperationStatus getStatus();
109
110
/**
111
* Get exception information if operation failed
112
* @return Optional exception message and details
113
*/
114
public Optional<String> getException();
115
116
/**
117
* Create OperationInfo with status
118
* @param status Current operation status
119
* @return OperationInfo instance
120
*/
121
public static OperationInfo of(OperationStatus status);
122
123
/**
124
* Create OperationInfo with status and exception
125
* @param status Current operation status
126
* @param exception Exception that occurred
127
* @return OperationInfo instance
128
*/
129
public static OperationInfo of(OperationStatus status, String exception);
130
}
131
```
132
133
### OperationManager
134
135
Manages operation lifecycle, execution, and resource cleanup.
136
137
```java { .api }
138
/**
139
* Manages operation lifecycle and execution
140
*/
141
public class OperationManager {
142
/**
143
* Submit operation for execution
144
* @param sessionHandle Session for the operation
145
* @param executor Callable that produces results
146
* @return OperationHandle for tracking
147
*/
148
public OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor);
149
150
/**
151
* Cancel running operation
152
* @param sessionHandle Session handle
153
* @param operationHandle Operation to cancel
154
*/
155
public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
156
157
/**
158
* Close operation and clean up resources
159
* @param sessionHandle Session handle
160
* @param operationHandle Operation to close
161
*/
162
public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
163
164
/**
165
* Get operation information
166
* @param sessionHandle Session handle
167
* @param operationHandle Operation handle
168
* @return OperationInfo with current status
169
*/
170
public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle);
171
172
/**
173
* Get operation result schema when available
174
* @param sessionHandle Session handle
175
* @param operationHandle Operation handle
176
* @return ResolvedSchema of operation results
177
*/
178
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle);
179
}
180
```
181
182
### OperationExecutor
183
184
Executes operations in background threads with timeout and cancellation support.
185
186
```java { .api }
187
/**
188
* Executes operations in background threads
189
*/
190
public class OperationExecutor {
191
/**
192
* Start the executor with configured thread pool
193
*/
194
public void start();
195
196
/**
197
* Stop the executor and shutdown thread pool
198
*/
199
public void stop();
200
201
/**
202
* Submit operation for asynchronous execution
203
* @param operation Operation to execute
204
* @return Future representing the execution
205
*/
206
public Future<ResultSet> submitOperation(Operation operation);
207
208
/**
209
* Get number of active operations
210
* @return Count of currently executing operations
211
*/
212
public int getActiveOperationCount();
213
}
214
```
215
216
## Usage Examples
217
218
### Basic Operation Submission
219
220
```java
221
import org.apache.flink.table.gateway.api.operation.OperationHandle;
222
import org.apache.flink.table.gateway.api.operation.OperationStatus;
223
import java.util.concurrent.Callable;
224
225
// Submit SQL statement as operation
226
OperationHandle operation = service.executeStatement(
227
sessionHandle,
228
"SELECT COUNT(*) FROM orders WHERE status = 'COMPLETED'",
229
30000L, // 30 second timeout
230
new Configuration()
231
);
232
233
// Check operation status
234
OperationInfo info = service.getOperationInfo(sessionHandle, operation);
235
System.out.println("Operation status: " + info.getStatus());
236
237
// Wait for completion
238
while (!info.getStatus().isTerminalStatus()) {
239
Thread.sleep(1000);
240
info = service.getOperationInfo(sessionHandle, operation);
241
}
242
243
if (info.getStatus() == OperationStatus.FINISHED) {
244
// Fetch results
245
ResultSet results = service.fetchResults(sessionHandle, operation, 0L, 100);
246
// Process results...
247
} else if (info.getStatus() == OperationStatus.ERROR) {
248
System.err.println("Operation failed: " + info.getException().orElse("Unknown error"));
249
}
250
251
// Clean up
252
service.closeOperation(sessionHandle, operation);
253
```
254
255
### Custom Operation Submission
256
257
```java
258
import java.util.concurrent.Callable;
259
260
// Submit custom operation with Callable
261
Callable<ResultSet> customLogic = () -> {
262
// Custom processing logic
263
List<RowData> data = processData();
264
ResolvedSchema schema = createSchema();
265
return ResultSet.builder()
266
.resultType(ResultType.PAYLOAD)
267
.data(data)
268
.resultSchema(schema)
269
.build();
270
};
271
272
OperationHandle customOp = service.submitOperation(sessionHandle, customLogic);
273
274
// Monitor execution
275
OperationInfo info = service.getOperationInfo(sessionHandle, customOp);
276
while (info.getStatus() == OperationStatus.RUNNING) {
277
Thread.sleep(500);
278
info = service.getOperationInfo(sessionHandle, customOp);
279
}
280
```
281
282
### Operation Cancellation
283
284
```java
285
// Start long-running operation
286
OperationHandle longOp = service.executeStatement(
287
sessionHandle,
288
"SELECT * FROM large_table ORDER BY timestamp",
289
0L, // No timeout
290
new Configuration()
291
);
292
293
// Check if operation is running
294
OperationInfo info = service.getOperationInfo(sessionHandle, longOp);
295
if (info.getStatus() == OperationStatus.RUNNING) {
296
// Cancel the operation
297
service.cancelOperation(sessionHandle, longOp);
298
299
// Verify cancellation
300
info = service.getOperationInfo(sessionHandle, longOp);
301
System.out.println("Operation status after cancel: " + info.getStatus());
302
}
303
304
// Clean up
305
service.closeOperation(sessionHandle, longOp);
306
```
307
308
### Batch Operation Management
309
310
```java
311
import java.util.List;
312
import java.util.Map;
313
import java.util.concurrent.CompletableFuture;
314
315
// Submit multiple operations
316
List<String> queries = List.of(
317
"SELECT COUNT(*) FROM users",
318
"SELECT AVG(price) FROM products",
319
"SELECT MAX(order_date) FROM orders"
320
);
321
322
Map<String, OperationHandle> operations = new HashMap<>();
323
for (int i = 0; i < queries.size(); i++) {
324
String query = queries.get(i);
325
OperationHandle op = service.executeStatement(sessionHandle, query, 60000L, new Configuration());
326
operations.put("query_" + i, op);
327
}
328
329
// Monitor all operations
330
Map<String, OperationInfo> results = new HashMap<>();
331
while (operations.size() > results.size()) {
332
for (Map.Entry<String, OperationHandle> entry : operations.entrySet()) {
333
String key = entry.getKey();
334
OperationHandle op = entry.getValue();
335
336
if (!results.containsKey(key)) {
337
OperationInfo info = service.getOperationInfo(sessionHandle, op);
338
if (info.getStatus().isTerminalStatus()) {
339
results.put(key, info);
340
System.out.println(key + " completed with status: " + info.getStatus());
341
342
if (info.getStatus() == OperationStatus.FINISHED) {
343
ResultSet resultSet = service.fetchResults(sessionHandle, op, 0L, 10);
344
// Process results...
345
}
346
347
// Clean up completed operation
348
service.closeOperation(sessionHandle, op);
349
}
350
}
351
}
352
Thread.sleep(1000);
353
}
354
```
355
356
### Operation Status Monitoring
357
358
```java
359
// Advanced status monitoring with timeout
360
public class OperationMonitor {
361
private final SqlGatewayService service;
362
363
public OperationResult waitForCompletion(
364
SessionHandle session,
365
OperationHandle operation,
366
long timeoutMs) {
367
368
long startTime = System.currentTimeMillis();
369
long endTime = startTime + timeoutMs;
370
371
while (System.currentTimeMillis() < endTime) {
372
OperationInfo info = service.getOperationInfo(session, operation);
373
374
switch (info.getStatus()) {
375
case FINISHED:
376
ResultSet results = service.fetchResults(session, operation, 0L, Integer.MAX_VALUE);
377
return OperationResult.success(results);
378
379
case ERROR:
380
return OperationResult.error(info.getException().orElse("Unknown error"));
381
382
case CANCELED:
383
return OperationResult.cancelled();
384
385
case TIMEOUT:
386
return OperationResult.timeout();
387
388
case RUNNING:
389
case PENDING:
390
case INITIALIZED:
391
// Continue waiting
392
try {
393
Thread.sleep(100);
394
} catch (InterruptedException e) {
395
Thread.currentThread().interrupt();
396
return OperationResult.interrupted();
397
}
398
break;
399
400
case CLOSED:
401
return OperationResult.error("Operation was closed");
402
}
403
}
404
405
// Timeout reached
406
service.cancelOperation(session, operation);
407
return OperationResult.timeout();
408
}
409
}
410
```
411
412
### Error Handling and Recovery
413
414
```java
415
// Robust operation execution with error handling
416
public ResultSet executeWithRetry(String sql, int maxRetries) {
417
for (int attempt = 1; attempt <= maxRetries; attempt++) {
418
OperationHandle operation = null;
419
try {
420
operation = service.executeStatement(
421
sessionHandle,
422
sql,
423
30000L,
424
new Configuration()
425
);
426
427
// Wait for completion
428
OperationInfo info;
429
do {
430
Thread.sleep(1000);
431
info = service.getOperationInfo(sessionHandle, operation);
432
} while (!info.getStatus().isTerminalStatus());
433
434
if (info.getStatus() == OperationStatus.FINISHED) {
435
return service.fetchResults(sessionHandle, operation, 0L, Integer.MAX_VALUE);
436
} else if (info.getStatus() == OperationStatus.ERROR) {
437
String error = info.getException().orElse("Unknown error");
438
if (attempt < maxRetries && isRetryableError(error)) {
439
System.out.println("Attempt " + attempt + " failed, retrying: " + error);
440
continue;
441
} else {
442
throw new RuntimeException("Operation failed: " + error);
443
}
444
} else {
445
throw new RuntimeException("Operation ended with status: " + info.getStatus());
446
}
447
448
} catch (Exception e) {
449
if (attempt == maxRetries) {
450
throw new RuntimeException("All retry attempts failed", e);
451
}
452
System.out.println("Attempt " + attempt + " failed with exception, retrying: " + e.getMessage());
453
} finally {
454
if (operation != null) {
455
try {
456
service.closeOperation(sessionHandle, operation);
457
} catch (Exception e) {
458
System.err.println("Failed to close operation: " + e.getMessage());
459
}
460
}
461
}
462
}
463
throw new RuntimeException("Should not reach here");
464
}
465
466
private boolean isRetryableError(String error) {
467
return error.contains("timeout") ||
468
error.contains("connection") ||
469
error.contains("temporary");
470
}
471
```