0
# Result Data Models
1
2
Result data models provide rich result types with metadata, pagination support, and schema information for handling query results and operation outcomes. These models support efficient data transfer and comprehensive result metadata.
3
4
## Capabilities
5
6
### ResultSet
7
8
Collection of query results with metadata, pagination tokens, and schema information.
9
10
```java { .api }
11
/**
12
* Collection of query results with metadata and pagination support
13
*/
14
public interface ResultSet {
15
/**
16
* Get the type of result (NOT_READY, PAYLOAD, EOS)
17
* @return ResultType indicating the nature of this result
18
*/
19
ResultType getResultType();
20
21
/**
22
* Get token for fetching next batch of results
23
* @return Long token for pagination, null if no more results
24
*/
25
Long getNextToken();
26
27
/**
28
* Get schema of the result data
29
* @return ResolvedSchema describing the structure of results
30
*/
31
ResolvedSchema getResultSchema();
32
33
/**
34
* Get actual row data
35
* @return List of RowData containing the query results
36
*/
37
List<RowData> getData();
38
39
/**
40
* Check if this is a query result (vs. status/metadata result)
41
* @return true if contains query data
42
*/
43
boolean isQueryResult();
44
45
/**
46
* Get associated Flink job ID if available
47
* @return Optional JobID for the operation
48
*/
49
Optional<JobID> getJobID();
50
51
/**
52
* Get result kind indicating the type of SQL operation
53
* @return ResultKind (SUCCESS, SUCCESS_WITH_CONTENT, etc.)
54
*/
55
ResultKind getResultKind();
56
}
57
58
/**
59
* Result type enumeration
60
*/
61
public enum ResultType {
62
/** Data is not ready yet, client should retry */
63
NOT_READY,
64
65
/** Contains actual data payload */
66
PAYLOAD,
67
68
/** End of stream, no more data available */
69
EOS
70
}
71
```
72
73
### ResultSetImpl
74
75
Default implementation of ResultSet with builder support.
76
77
```java { .api }
78
/**
79
* Default implementation of ResultSet
80
*/
81
public class ResultSetImpl implements ResultSet {
82
/**
83
* Create builder for constructing ResultSet
84
* @return Builder instance
85
*/
86
public static Builder builder();
87
88
/**
89
* Builder for constructing ResultSet instances
90
*/
91
public static class Builder {
92
/**
93
* Set result type
94
* @param resultType Type of result
95
* @return Builder for chaining
96
*/
97
public Builder resultType(ResultType resultType);
98
99
/**
100
* Set next token for pagination
101
* @param nextToken Token for next batch
102
* @return Builder for chaining
103
*/
104
public Builder nextToken(Long nextToken);
105
106
/**
107
* Set result schema
108
* @param schema Schema describing the data structure
109
* @return Builder for chaining
110
*/
111
public Builder resultSchema(ResolvedSchema schema);
112
113
/**
114
* Set result data
115
* @param data List of row data
116
* @return Builder for chaining
117
*/
118
public Builder data(List<RowData> data);
119
120
/**
121
* Set job ID
122
* @param jobID Associated job ID
123
* @return Builder for chaining
124
*/
125
public Builder jobID(JobID jobID);
126
127
/**
128
* Set result kind
129
* @param resultKind Kind of result
130
* @return Builder for chaining
131
*/
132
public Builder resultKind(ResultKind resultKind);
133
134
/**
135
* Build the ResultSet instance
136
* @return Constructed ResultSet
137
*/
138
public ResultSet build();
139
}
140
}
141
```
142
143
### TableInfo
144
145
Basic information about tables and views in the catalog.
146
147
```java { .api }
148
/**
149
* Basic information about tables/views
150
*/
151
public class TableInfo {
152
/**
153
* Get table identifier (catalog.database.table)
154
* @return ObjectIdentifier for the table
155
*/
156
public ObjectIdentifier getIdentifier();
157
158
/**
159
* Get table kind (TABLE, VIEW, MATERIALIZED_TABLE, etc.)
160
* @return TableKind indicating the type of table
161
*/
162
public TableKind getTableKind();
163
164
/**
165
* Create TableInfo instance
166
* @param identifier Table identifier
167
* @param tableKind Kind of table
168
* @return TableInfo instance
169
*/
170
public static TableInfo of(ObjectIdentifier identifier, TableKind tableKind);
171
172
@Override
173
public boolean equals(Object o);
174
175
@Override
176
public int hashCode();
177
178
@Override
179
public String toString();
180
}
181
```
182
183
### FunctionInfo
184
185
Information about functions without loading their implementation.
186
187
```java { .api }
188
/**
189
* Information about functions without implementation loading
190
*/
191
public class FunctionInfo {
192
/**
193
* Get function identifier
194
* @return UnresolvedIdentifier for the function
195
*/
196
public UnresolvedIdentifier getIdentifier();
197
198
/**
199
* Get function kind if available
200
* @return Optional FunctionKind (SCALAR, TABLE, AGGREGATE, etc.)
201
*/
202
public Optional<FunctionKind> getKind();
203
204
/**
205
* Create FunctionInfo with identifier only
206
* @param identifier Function identifier
207
* @return FunctionInfo instance
208
*/
209
public static FunctionInfo of(UnresolvedIdentifier identifier);
210
211
/**
212
* Create FunctionInfo with identifier and kind
213
* @param identifier Function identifier
214
* @param kind Function kind
215
* @return FunctionInfo instance
216
*/
217
public static FunctionInfo of(UnresolvedIdentifier identifier, FunctionKind kind);
218
219
@Override
220
public boolean equals(Object o);
221
222
@Override
223
public int hashCode();
224
225
@Override
226
public String toString();
227
}
228
```
229
230
### OperationInfo
231
232
Status and error information for operations.
233
234
```java { .api }
235
/**
236
* Status and error information for operations
237
*/
238
public class OperationInfo {
239
/**
240
* Get current operation status
241
* @return OperationStatus representing current state
242
*/
243
public OperationStatus getStatus();
244
245
/**
246
* Get exception information if operation failed
247
* @return Optional exception message and stack trace
248
*/
249
public Optional<String> getException();
250
251
/**
252
* Create OperationInfo with status only
253
* @param status Current operation status
254
* @return OperationInfo instance
255
*/
256
public static OperationInfo of(OperationStatus status);
257
258
/**
259
* Create OperationInfo with status and exception
260
* @param status Current operation status
261
* @param exception Exception details
262
* @return OperationInfo instance
263
*/
264
public static OperationInfo of(OperationStatus status, String exception);
265
266
@Override
267
public boolean equals(Object o);
268
269
@Override
270
public int hashCode();
271
272
@Override
273
public String toString();
274
}
275
```
276
277
### GatewayInfo
278
279
Information about the SQL Gateway service.
280
281
```java { .api }
282
/**
283
* Information about the SQL Gateway service
284
*/
285
public class GatewayInfo {
286
/**
287
* Get product name (always "Apache Flink")
288
* @return Product name string
289
*/
290
public String getProductName();
291
292
/**
293
* Get current Flink version
294
* @return Version string
295
*/
296
public String getVersion();
297
298
/**
299
* Create GatewayInfo instance
300
* @param productName Product name
301
* @param version Version string
302
* @return GatewayInfo instance
303
*/
304
public static GatewayInfo of(String productName, String version);
305
306
@Override
307
public boolean equals(Object o);
308
309
@Override
310
public int hashCode();
311
312
@Override
313
public String toString();
314
}
315
```
316
317
### FetchOrientation
318
319
Enumeration for result fetching direction.
320
321
```java { .api }
322
/**
323
* Enumeration for result fetching direction
324
*/
325
public enum FetchOrientation {
326
/** Fetch next results in forward direction */
327
FETCH_NEXT,
328
329
/** Fetch prior results in backward direction */
330
FETCH_PRIOR
331
}
332
```
333
334
### NotReadyResult
335
336
Special result indicating data is not ready yet.
337
338
```java { .api }
339
/**
340
* Special result indicating data not ready yet
341
*/
342
public class NotReadyResult implements ResultSet {
343
/**
344
* Create not ready result
345
* @return NotReadyResult instance
346
*/
347
public static NotReadyResult INSTANCE;
348
349
@Override
350
public ResultType getResultType();
351
// Always returns ResultType.NOT_READY
352
353
@Override
354
public Long getNextToken();
355
// Returns null
356
357
@Override
358
public ResolvedSchema getResultSchema();
359
// Returns null
360
361
@Override
362
public List<RowData> getData();
363
// Returns empty list
364
365
@Override
366
public boolean isQueryResult();
367
// Returns false
368
369
@Override
370
public Optional<JobID> getJobID();
371
// Returns empty optional
372
373
@Override
374
public ResultKind getResultKind();
375
// Returns null
376
}
377
```
378
379
## Usage Examples
380
381
### Processing Query Results
382
383
```java
384
import org.apache.flink.table.gateway.api.results.ResultSet;
385
import org.apache.flink.table.gateway.api.results.ResultType;
386
387
// Execute query and process results
388
OperationHandle operation = service.executeStatement(
389
sessionHandle,
390
"SELECT id, name, salary FROM employees WHERE department = 'Engineering'",
391
30000L,
392
new Configuration()
393
);
394
395
// Wait for completion and fetch results
396
long token = 0L;
397
int batchSize = 100;
398
399
while (true) {
400
ResultSet resultSet = service.fetchResults(sessionHandle, operation, token, batchSize);
401
402
switch (resultSet.getResultType()) {
403
case NOT_READY:
404
// Data not ready, wait and retry
405
Thread.sleep(1000);
406
continue;
407
408
case PAYLOAD:
409
// Process data
410
processResultData(resultSet);
411
412
// Check for more data
413
Long nextToken = resultSet.getNextToken();
414
if (nextToken != null) {
415
token = nextToken;
416
continue; // Fetch next batch
417
} else {
418
// No more data
419
break;
420
}
421
422
case EOS:
423
// End of stream
424
System.out.println("All results processed");
425
break;
426
}
427
break;
428
}
429
430
private void processResultData(ResultSet resultSet) {
431
ResolvedSchema schema = resultSet.getResultSchema();
432
List<RowData> data = resultSet.getData();
433
434
System.out.println("Schema: " + schema);
435
System.out.println("Batch size: " + data.size());
436
437
for (RowData row : data) {
438
// Process each row
439
System.out.println("Row: " + row);
440
}
441
}
442
```
443
444
### Building Custom Results
445
446
```java
447
import org.apache.flink.table.gateway.api.results.ResultSetImpl;
448
import org.apache.flink.table.types.logical.VarCharType;
449
import org.apache.flink.table.types.logical.IntType;
450
451
// Create custom result set
452
List<RowData> customData = createCustomData();
453
ResolvedSchema customSchema = ResolvedSchema.of(
454
Column.physical("id", new IntType()),
455
Column.physical("name", new VarCharType(255)),
456
Column.physical("status", new VarCharType(50))
457
);
458
459
ResultSet customResult = ResultSetImpl.builder()
460
.resultType(ResultType.PAYLOAD)
461
.resultSchema(customSchema)
462
.data(customData)
463
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
464
.nextToken(null) // No more data
465
.build();
466
467
// Use in custom operation
468
Callable<ResultSet> customOperation = () -> customResult;
469
OperationHandle operation = service.submitOperation(sessionHandle, customOperation);
470
```
471
472
### Catalog Information Processing
473
474
```java
475
import org.apache.flink.table.gateway.api.results.TableInfo;
476
import org.apache.flink.table.gateway.api.results.FunctionInfo;
477
478
// List tables and process information
479
Set<TableInfo> tables = service.listTables(
480
sessionHandle,
481
"my_catalog",
482
"my_database",
483
Set.of(TableKind.TABLE, TableKind.VIEW)
484
);
485
486
System.out.println("Found " + tables.size() + " tables/views:");
487
for (TableInfo table : tables) {
488
System.out.println("- " + table.getIdentifier() + " (" + table.getTableKind() + ")");
489
490
// Get detailed table information
491
if (table.getTableKind() == TableKind.TABLE) {
492
ResolvedCatalogBaseTable<?> tableDetails = service.getTable(sessionHandle, table.getIdentifier());
493
System.out.println(" Schema: " + tableDetails.getResolvedSchema());
494
System.out.println(" Options: " + tableDetails.getOptions());
495
}
496
}
497
498
// List functions
499
Set<FunctionInfo> functions = service.listUserDefinedFunctions(sessionHandle, "my_catalog", "my_database");
500
System.out.println("\nFound " + functions.size() + " user-defined functions:");
501
for (FunctionInfo function : functions) {
502
System.out.println("- " + function.getIdentifier() +
503
(function.getKind().isPresent() ? " (" + function.getKind().get() + ")" : ""));
504
}
505
```
506
507
### Result Set Pagination
508
509
```java
510
// Efficient pagination through large result sets
511
public class ResultSetPaginator {
512
private final SqlGatewayService service;
513
private final SessionHandle sessionHandle;
514
private final OperationHandle operationHandle;
515
private final int pageSize;
516
517
public ResultSetPaginator(SqlGatewayService service, SessionHandle sessionHandle,
518
OperationHandle operationHandle, int pageSize) {
519
this.service = service;
520
this.sessionHandle = sessionHandle;
521
this.operationHandle = operationHandle;
522
this.pageSize = pageSize;
523
}
524
525
public Iterator<List<RowData>> iterator() {
526
return new Iterator<List<RowData>>() {
527
private long currentToken = 0L;
528
private boolean hasNext = true;
529
private List<RowData> nextBatch = null;
530
531
@Override
532
public boolean hasNext() {
533
if (nextBatch == null && hasNext) {
534
fetchNextBatch();
535
}
536
return nextBatch != null && !nextBatch.isEmpty();
537
}
538
539
@Override
540
public List<RowData> next() {
541
if (!hasNext()) {
542
throw new NoSuchElementException();
543
}
544
List<RowData> result = nextBatch;
545
nextBatch = null;
546
return result;
547
}
548
549
private void fetchNextBatch() {
550
try {
551
ResultSet resultSet = service.fetchResults(sessionHandle, operationHandle, currentToken, pageSize);
552
553
if (resultSet.getResultType() == ResultType.PAYLOAD) {
554
nextBatch = resultSet.getData();
555
Long nextToken = resultSet.getNextToken();
556
if (nextToken != null) {
557
currentToken = nextToken;
558
} else {
559
hasNext = false;
560
}
561
} else if (resultSet.getResultType() == ResultType.EOS) {
562
nextBatch = Collections.emptyList();
563
hasNext = false;
564
} else {
565
// NOT_READY - could implement retry logic here
566
nextBatch = Collections.emptyList();
567
hasNext = false;
568
}
569
} catch (Exception e) {
570
throw new RuntimeException("Failed to fetch next batch", e);
571
}
572
}
573
};
574
}
575
}
576
577
// Usage
578
ResultSetPaginator paginator = new ResultSetPaginator(service, sessionHandle, operation, 1000);
579
for (List<RowData> batch : paginator) {
580
System.out.println("Processing batch of " + batch.size() + " rows");
581
// Process batch...
582
}
583
```
584
585
### Error Information Handling
586
587
```java
588
// Comprehensive error handling with operation info
589
public class OperationErrorHandler {
590
591
public void handleOperationResult(SessionHandle session, OperationHandle operation) {
592
OperationInfo info = service.getOperationInfo(session, operation);
593
594
switch (info.getStatus()) {
595
case FINISHED:
596
System.out.println("Operation completed successfully");
597
processResults(session, operation);
598
break;
599
600
case ERROR:
601
String error = info.getException().orElse("Unknown error occurred");
602
System.err.println("Operation failed: " + error);
603
604
// Parse error for specific handling
605
if (error.contains("TableNotExistException")) {
606
handleTableNotFound(error);
607
} else if (error.contains("ValidationException")) {
608
handleValidationError(error);
609
} else if (error.contains("OutOfMemoryError")) {
610
handleMemoryError(error);
611
} else {
612
handleGenericError(error);
613
}
614
break;
615
616
case TIMEOUT:
617
System.err.println("Operation timed out");
618
// Could implement retry with longer timeout
619
break;
620
621
case CANCELED:
622
System.out.println("Operation was canceled");
623
break;
624
625
default:
626
System.out.println("Operation in unexpected status: " + info.getStatus());
627
}
628
}
629
630
private void handleTableNotFound(String error) {
631
System.err.println("Table not found - check catalog and database names");
632
}
633
634
private void handleValidationError(String error) {
635
System.err.println("SQL validation failed - check syntax and schema");
636
}
637
638
private void handleMemoryError(String error) {
639
System.err.println("Memory error - consider reducing batch size or increasing resources");
640
}
641
642
private void handleGenericError(String error) {
643
System.err.println("Generic error occurred: " + error);
644
}
645
646
private void processResults(SessionHandle session, OperationHandle operation) {
647
// Process successful results...
648
}
649
}
650
```