0
# Table Utilities
1
2
Utility functions for table operations, column management, type checking, and data format conversion between Flink table types. These utilities provide essential functionality for working with Flink tables in ML contexts.
3
4
## Capabilities
5
6
### TableUtil Class
7
8
Core utility class providing comprehensive table operations including column discovery, type validation, and schema manipulation.
9
10
```java { .api }
11
/**
12
* Utility methods for table operations and column management
13
*/
14
public class TableUtil {
15
16
/**
17
* Generate temporary table name
18
* @return Unique temporary table name
19
*/
20
public static String getTempTableName();
21
22
/**
23
* Find column index in array
24
* @param tableCols Array of column names
25
* @param targetCol Target column name to find
26
* @return Index of target column, -1 if not found
27
*/
28
public static int findColIndex(String[] tableCols, String targetCol);
29
30
/**
31
* Find column index in table schema
32
* @param tableSchema Table schema
33
* @param targetCol Target column name to find
34
* @return Index of target column, -1 if not found
35
*/
36
public static int findColIndex(TableSchema tableSchema, String targetCol);
37
38
/**
39
* Find multiple column indices in array
40
* @param tableCols Array of column names
41
* @param targetCols Target column names to find
42
* @return Array of indices for target columns
43
*/
44
public static int[] findColIndices(String[] tableCols, String[] targetCols);
45
46
/**
47
* Find multiple column indices in table schema
48
* @param tableSchema Table schema
49
* @param targetCols Target column names to find
50
* @return Array of indices for target columns
51
*/
52
public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);
53
54
/**
55
* Find column types from table schema
56
* @param tableSchema Table schema
57
* @param targetCols Target column names
58
* @return Array of TypeInformation for target columns
59
*/
60
public static TypeInformation<?>[] findColTypes(TableSchema tableSchema, String[] targetCols);
61
62
/**
63
* Find single column type from table schema
64
* @param tableSchema Table schema
65
* @param targetCol Target column name
66
* @return TypeInformation for target column
67
*/
68
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
69
70
/**
71
* Check if data type is supported numeric type
72
* @param dataType Type to check
73
* @return true if numeric type, false otherwise
74
*/
75
public static boolean isSupportedNumericType(TypeInformation<?> dataType);
76
77
/**
78
* Check if data type is string type
79
* @param dataType Type to check
80
* @return true if string type, false otherwise
81
*/
82
public static boolean isString(TypeInformation<?> dataType);
83
84
/**
85
* Check if data type is vector type
86
* @param dataType Type to check
87
* @return true if vector type, false otherwise
88
*/
89
public static boolean isVector(TypeInformation<?> dataType);
90
91
/**
92
* Assert that selected columns exist in table
93
* @param tableCols Array of table column names
94
* @param selectedCols Selected column names to verify
95
* @throws IllegalArgumentException if any column doesn't exist
96
*/
97
public static void assertSelectedColExist(String[] tableCols, String... selectedCols);
98
99
/**
100
* Assert that columns are numeric types
101
* @param tableSchema Table schema
102
* @param selectedCols Column names to check
103
* @throws IllegalArgumentException if any column is not numeric
104
*/
105
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
106
107
/**
108
* Assert that columns are string types
109
* @param tableSchema Table schema
110
* @param selectedCols Column names to check
111
* @throws IllegalArgumentException if any column is not string
112
*/
113
public static void assertStringCols(TableSchema tableSchema, String... selectedCols);
114
115
/**
116
* Assert that columns are vector types
117
* @param tableSchema Table schema
118
* @param selectedCols Column names to check
119
* @throws IllegalArgumentException if any column is not vector
120
*/
121
public static void assertVectorCols(TableSchema tableSchema, String... selectedCols);
122
123
/**
124
* Get all string column names from schema
125
* @param tableSchema Table schema
126
* @return Array of string column names
127
*/
128
public static String[] getStringCols(TableSchema tableSchema);
129
130
/**
131
* Get string column names excluding specified columns
132
* @param tableSchema Table schema
133
* @param excludeCols Columns to exclude
134
* @return Array of string column names (excluding specified)
135
*/
136
public static String[] getStringCols(TableSchema tableSchema, String[] excludeCols);
137
138
/**
139
* Get all numeric column names from schema
140
* @param tableSchema Table schema
141
* @return Array of numeric column names
142
*/
143
public static String[] getNumericCols(TableSchema tableSchema);
144
145
/**
146
* Get numeric column names excluding specified columns
147
* @param tableSchema Table schema
148
* @param excludeCols Columns to exclude
149
* @return Array of numeric column names (excluding specified)
150
*/
151
public static String[] getNumericCols(TableSchema tableSchema, String[] excludeCols);
152
153
/**
154
* Get categorical columns from feature columns
155
* @param tableSchema Table schema
156
* @param featureCols Feature column names
157
* @param categoricalCols Explicitly specified categorical columns
158
* @return Array of categorical column names
159
*/
160
public static String[] getCategoricalCols(TableSchema tableSchema, String[] featureCols,
161
String[] categoricalCols);
162
163
/**
164
* Format column names as markdown table header
165
* @param colNames Column names
166
* @return Markdown formatted header string
167
*/
168
public static String formatTitle(String[] colNames);
169
170
/**
171
* Format row as markdown table row
172
* @param row Row data
173
* @return Markdown formatted row string
174
*/
175
public static String formatRows(Row row);
176
177
/**
178
* Format table data as markdown table
179
* @param colNames Column names
180
* @param data List of row data
181
* @return Complete markdown formatted table
182
*/
183
public static String format(String[] colNames, List<Row> data);
184
185
/**
186
* Convert column names to SQL SELECT clause
187
* @param colNames Column names
188
* @return SQL clause string
189
*/
190
public static String columnsToSqlClause(String[] colNames);
191
}
192
```
193
194
**Usage Examples:**
195
196
```java
197
import org.apache.flink.ml.common.utils.TableUtil;
198
import org.apache.flink.table.api.TableSchema;
199
import org.apache.flink.types.Row;
200
201
// Column discovery and validation
202
TableSchema schema = table.getSchema();
203
String[] colNames = schema.getFieldNames();
204
205
// Find specific columns
206
int ageIndex = TableUtil.findColIndex(colNames, "age");
207
int[] featureIndices = TableUtil.findColIndices(schema, new String[]{"feature1", "feature2"});
208
209
// Type checking
210
TypeInformation<?> ageType = TableUtil.findColType(schema, "age");
211
boolean isNumeric = TableUtil.isSupportedNumericType(ageType);
212
213
// Column validation
214
TableUtil.assertSelectedColExist(colNames, "age", "name", "salary");
215
TableUtil.assertNumericalCols(schema, "age", "salary", "score");
216
TableUtil.assertStringCols(schema, "name", "category");
217
218
// Get columns by type
219
String[] stringCols = TableUtil.getStringCols(schema);
220
String[] numericCols = TableUtil.getNumericCols(schema, new String[]{"id"}); // Exclude id
221
```
222
223
### VectorTypes Class
224
225
Built-in type information constants for vector types used throughout the Flink ML library.
226
227
```java { .api }
228
/**
229
* Built-in vector type information
230
*/
231
public class VectorTypes {
232
/** TypeInformation for DenseVector */
233
public static final TypeInformation<DenseVector> DENSE_VECTOR;
234
235
/** TypeInformation for SparseVector */
236
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
237
238
/** TypeInformation for base Vector class */
239
public static final TypeInformation<Vector> VECTOR;
240
}
241
```
242
243
**Usage Examples:**
244
245
```java
246
import org.apache.flink.ml.common.utils.VectorTypes;
247
import org.apache.flink.api.common.typeinfo.TypeInformation;
248
249
// Use vector type information
250
TypeInformation<DenseVector> denseVectorType = VectorTypes.DENSE_VECTOR;
251
TypeInformation<SparseVector> sparseVectorType = VectorTypes.SPARSE_VECTOR;
252
TypeInformation<Vector> vectorType = VectorTypes.VECTOR;
253
254
// Type checking
255
boolean isDenseVector = TableUtil.findColType(schema, "features").equals(VectorTypes.DENSE_VECTOR);
256
boolean isAnyVector = TableUtil.isVector(TableUtil.findColType(schema, "features"));
257
258
// Schema creation with vector columns
259
TableSchema.Builder schemaBuilder = TableSchema.builder()
260
.field("id", Types.LONG)
261
.field("features", VectorTypes.DENSE_VECTOR)
262
.field("sparse_features", VectorTypes.SPARSE_VECTOR);
263
```
264
265
### Data Conversion Utilities
266
267
Additional utilities for data conversion between different Flink data structures.
268
269
```java { .api }
270
/**
271
* DataStream conversion utilities
272
*/
273
public class DataStreamConversionUtil {
274
// Utility methods for converting between DataStream and Table
275
// Implementation details available in source code
276
}
277
278
/**
279
* DataSet conversion utilities (for batch processing)
280
*/
281
public class DataSetConversionUtil {
282
// Utility methods for converting between DataSet and Table
283
// Implementation details available in source code
284
}
285
```
286
287
### Table Formatting and Output
288
289
Utilities for formatting table data for display and debugging purposes.
290
291
**Usage Examples:**
292
293
```java
294
import org.apache.flink.types.Row;
295
import java.util.List;
296
import java.util.Arrays;
297
298
// Sample data for formatting
299
String[] columnNames = {"Name", "Age", "Salary"};
300
List<Row> data = Arrays.asList(
301
Row.of("Alice", 25, 50000),
302
Row.of("Bob", 30, 60000),
303
Row.of("Charlie", 35, 70000)
304
);
305
306
// Format as markdown table
307
String markdownTable = TableUtil.format(columnNames, data);
308
System.out.println(markdownTable);
309
/* Output:
310
| Name | Age | Salary |
311
|------|-----|--------|
312
| Alice | 25 | 50000 |
313
| Bob | 30 | 60000 |
314
| Charlie | 35 | 70000 |
315
*/
316
317
// Format individual components
318
String header = TableUtil.formatTitle(columnNames);
319
String firstRow = TableUtil.formatRows(data.get(0));
320
321
// SQL clause generation
322
String sqlClause = TableUtil.columnsToSqlClause(columnNames);
323
// Result: "Name, Age, Salary"
324
```
325
326
### Advanced Column Operations
327
328
Advanced patterns for working with table schemas and column management.
329
330
**Usage Examples:**
331
332
```java
333
// Column filtering and selection patterns
334
public class ColumnManager {
335
336
public static String[] selectNumericColumns(TableSchema schema, String[] candidates) {
337
return Arrays.stream(candidates)
338
.filter(col -> {
339
try {
340
TypeInformation<?> type = TableUtil.findColType(schema, col);
341
return TableUtil.isSupportedNumericType(type);
342
} catch (Exception e) {
343
return false;
344
}
345
})
346
.toArray(String[]::new);
347
}
348
349
public static String[] findFeatureColumns(TableSchema schema, String labelCol) {
350
String[] allCols = schema.getFieldNames();
351
return Arrays.stream(allCols)
352
.filter(col -> !col.equals(labelCol))
353
.filter(col -> {
354
TypeInformation<?> type = TableUtil.findColType(schema, col);
355
return TableUtil.isSupportedNumericType(type) || TableUtil.isVector(type);
356
})
357
.toArray(String[]::new);
358
}
359
}
360
361
// Usage
362
TableSchema schema = table.getSchema();
363
String[] candidates = {"age", "salary", "name", "category"};
364
String[] numericFeatures = ColumnManager.selectNumericColumns(schema, candidates);
365
String[] allFeatures = ColumnManager.findFeatureColumns(schema, "label");
366
367
// Validation with error handling
368
try {
369
TableUtil.assertNumericalCols(schema, numericFeatures);
370
System.out.println("All selected columns are numeric");
371
} catch (IllegalArgumentException e) {
372
System.err.println("Non-numeric column found: " + e.getMessage());
373
}
374
375
// Safe column access
376
public static double[] getNumericColumnData(Table table, String columnName) {
377
TableSchema schema = table.getSchema();
378
379
// Validate column exists and is numeric
380
TableUtil.assertSelectedColExist(schema.getFieldNames(), columnName);
381
TableUtil.assertNumericalCols(schema, columnName);
382
383
// Extract data (implementation would depend on specific use case)
384
// This is a conceptual example
385
return new double[0]; // Placeholder
386
}
387
```