0
# Utility Libraries
1
2
Comprehensive utility libraries for table operations, vector parsing, data conversion, and statistical operations. Essential support classes for common ML operations and data processing tasks.
3
4
## Capabilities
5
6
### TableUtil Class
7
8
Comprehensive utility class for table content and schema operations.
9
10
```java { .api }
11
/**
12
* Utility class for table content and schema operations
13
* Provides validation, column manipulation, and formatting functions
14
*/
15
public class TableUtil {
16
17
// Table and column utilities
18
19
/** Generate unique temporary table name */
20
public static String getTempTableName();
21
22
/** Find column index in string array */
23
public static int findColIndex(String[] tableCols, String targetCol);
24
25
/** Find column index in table schema */
26
public static int findColIndex(TableSchema tableSchema, String targetCol);
27
28
/** Find multiple column indices in string array */
29
public static int[] findColIndices(String[] tableCols, String[] targetCols);
30
31
/** Find multiple column indices in table schema */
32
public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);
33
34
/** Find column types for specified columns */
35
public static TypeInformation<?>[] findColTypes(TableSchema tableSchema, String[] targetCols);
36
37
/** Find single column type */
38
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
39
40
// Type checking utilities
41
42
/** Check if data type is supported numeric type */
43
public static boolean isSupportedNumericType(TypeInformation<?> dataType);
44
45
/** Check if data type is string type */
46
public static boolean isString(TypeInformation<?> dataType);
47
48
/** Check if data type is vector type */
49
public static boolean isVector(TypeInformation<?> dataType);
50
51
// Column validation utilities
52
53
/** Assert that selected columns exist in table */
54
public static void assertSelectedColExist(String[] tableCols, String... selectedCols);
55
56
/** Assert that selected columns are numerical */
57
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
58
59
/** Assert that selected columns are string type */
60
public static void assertStringCols(TableSchema tableSchema, String... selectedCols);
61
62
/** Assert that selected columns are vector type */
63
public static void assertVectorCols(TableSchema tableSchema, String... selectedCols);
64
65
// Column filtering utilities
66
67
/** Get all string columns from schema */
68
public static String[] getStringCols(TableSchema tableSchema);
69
70
/** Get string columns excluding specified columns */
71
public static String[] getStringCols(TableSchema tableSchema, String[] excludeCols);
72
73
/** Get all numeric columns from schema */
74
public static String[] getNumericCols(TableSchema tableSchema);
75
76
/** Get numeric columns excluding specified columns */
77
public static String[] getNumericCols(TableSchema tableSchema, String[] excludeCols);
78
79
/** Get categorical columns from feature columns */
80
public static String[] getCategoricalCols(TableSchema tableSchema,
81
String[] featureCols,
82
String[] categoricalCols);
83
84
// Formatting utilities
85
86
/** Format column names as markdown table header */
87
public static String formatTitle(String[] colNames);
88
89
/** Format table row as markdown */
90
public static String formatRows(Row row);
91
92
/** Format table data as markdown table */
93
public static String format(String[] colNames, List<Row> data);
94
95
/** Convert column names to SQL SELECT clause */
96
public static String columnsToSqlClause(String[] colNames);
97
}
98
```
99
100
**Usage Examples:**
101
102
```java
103
import org.apache.flink.ml.common.utils.TableUtil;
104
import org.apache.flink.table.api.TableSchema;
105
import org.apache.flink.types.Row;
106
107
// Schema validation
108
TableSchema schema = // ... your table schema
109
String targetCol = "features";
110
111
// Check if column exists and get its type
112
int colIndex = TableUtil.findColIndex(schema, targetCol);
113
TypeInformation<?> colType = TableUtil.findColType(schema, targetCol);
114
115
// Validate column types
116
TableUtil.assertNumericalCols(schema, "age", "salary", "score");
117
TableUtil.assertVectorCols(schema, "features", "embeddings");
118
119
// Get columns by type
120
String[] numericCols = TableUtil.getNumericCols(schema);
121
String[] stringCols = TableUtil.getStringCols(schema, new String[]{"id"}); // Exclude 'id'
122
123
// Type checking
124
boolean isNumeric = TableUtil.isSupportedNumericType(colType);
125
boolean isVector = TableUtil.isVector(colType);
126
127
// Format table data for display
128
String[] headers = {"Name", "Age", "Score"};
129
List<Row> data = Arrays.asList(
130
Row.of("Alice", 25, 95.0),
131
Row.of("Bob", 30, 87.5)
132
);
133
String markdown = TableUtil.format(headers, data);
134
System.out.println(markdown);
135
/*
136
| Name | Age | Score |
137
|------|-----|-------|
138
| Alice | 25 | 95.0 |
139
| Bob | 30 | 87.5 |
140
*/
141
```
142
143
### VectorTypes Class
144
145
Constants class providing built-in vector type information for Flink's type system.
146
147
```java { .api }
148
/**
149
* Built-in vector type information constants
150
* Used for table schema definition and type checking
151
*/
152
public class VectorTypes {
153
154
/** Type information for DenseVector */
155
public static final TypeInformation<DenseVector> DENSE_VECTOR;
156
157
/** Type information for SparseVector */
158
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
159
160
/** Type information for general Vector (base class) */
161
public static final TypeInformation<Vector> VECTOR;
162
}
163
```
164
165
**Usage Examples:**
166
167
```java
168
import org.apache.flink.ml.common.utils.VectorTypes;
169
import org.apache.flink.table.api.DataTypes;
170
import org.apache.flink.table.api.TableSchema;
171
172
// Create table schema with vector columns
173
TableSchema schema = TableSchema.builder()
174
.field("id", DataTypes.BIGINT())
175
.field("dense_features", VectorTypes.DENSE_VECTOR)
176
.field("sparse_features", VectorTypes.SPARSE_VECTOR)
177
.field("generic_vector", VectorTypes.VECTOR)
178
.build();
179
180
// Type checking with vector types
181
TypeInformation<?> colType = schema.getFieldDataTypes()[1].bridgedTo(VectorTypes.DENSE_VECTOR);
182
boolean isDenseVector = colType.equals(VectorTypes.DENSE_VECTOR);
183
```
184
185
### Data Conversion Utilities
186
187
Utilities for converting between different Flink data representations.
188
189
#### DataSetConversionUtil Class
190
191
```java { .api }
192
/**
193
* Utility class for DataSet conversion operations
194
* Provides methods for converting between DataSet and other formats
195
*/
196
public class DataSetConversionUtil {
197
// Methods for DataSet conversions
198
// Implementation details depend on specific conversion needs
199
}
200
```
201
202
#### DataStreamConversionUtil Class
203
204
```java { .api }
205
/**
206
* Utility class for DataStream conversion operations
207
* Provides methods for converting between DataStream and other formats
208
*/
209
public class DataStreamConversionUtil {
210
// Methods for DataStream conversions
211
// Implementation details depend on specific conversion needs
212
}
213
```
214
215
### OutputColsHelper Class
216
217
Helper class for managing output column configurations in ML operations.
218
219
```java { .api }
220
/**
221
* Helper class for output column management
222
* Assists with column naming and schema generation
223
*/
224
public class OutputColsHelper {
225
// Methods for managing output column configurations
226
// Used internally by ML operators for column management
227
}
228
```
229
230
### Mapper Framework Utilities
231
232
Utilities for row-wise data transformations and model applications.
233
234
#### Mapper Abstract Class
235
236
```java { .api }
237
/**
238
* Abstract class for row-wise transformations
239
* Base class for implementing custom row mappers
240
*/
241
public abstract class Mapper implements Serializable {
242
243
/** Input schema field names */
244
private String[] dataFieldNames;
245
246
/** Input schema field types */
247
private DataType[] dataFieldTypes;
248
249
/** Mapper parameters */
250
protected Params params;
251
252
/** Create mapper with schema and parameters */
253
public Mapper(TableSchema dataSchema, Params params);
254
255
/** Get input data schema */
256
public TableSchema getDataSchema();
257
258
/** Transform input row to output row (must implement) */
259
public abstract Row map(Row row);
260
261
/** Get output schema (must implement) */
262
public abstract TableSchema getOutputSchema();
263
}
264
```
265
266
**Implementation Example:**
267
268
```java
269
public class ScalingMapper extends Mapper {
270
271
public ScalingMapper(TableSchema dataSchema, Params params) {
272
super(dataSchema, params);
273
}
274
275
@Override
276
public Row map(Row row) {
277
// Extract scaling parameters
278
double scaleFactor = params.get(SCALE_FACTOR);
279
String inputCol = params.get(INPUT_COL);
280
281
// Apply scaling transformation
282
int colIndex = // ... find column index
283
double originalValue = row.getField(colIndex);
284
double scaledValue = originalValue * scaleFactor;
285
286
// Create output row
287
Row output = Row.copy(row);
288
output.setField(colIndex, scaledValue);
289
290
return output;
291
}
292
293
@Override
294
public TableSchema getOutputSchema() {
295
// Return schema with same structure as input
296
return getDataSchema();
297
}
298
}
299
```
300
301
#### MapperAdapter Class
302
303
```java { .api }
304
/**
305
* Adapter for integrating Mapper with operator framework
306
* Converts row-wise mappers to table operations
307
*/
308
public class MapperAdapter {
309
// Methods for adapter functionality
310
// Bridges between Mapper and BatchOperator/StreamOperator
311
}
312
```
313
314
#### ModelMapper Abstract Class
315
316
```java { .api }
317
/**
318
* Abstract class for model-based row transformations
319
* Extends Mapper with model data support
320
*/
321
public abstract class ModelMapper extends Mapper {
322
// Additional functionality for model-based row transformations
323
// Includes model data loading and management
324
}
325
```
326
327
#### ModelMapperAdapter Class
328
329
```java { .api }
330
/**
331
* Adapter for integrating ModelMapper with operator framework
332
* Converts model-based row mappers to table operations
333
*/
334
public class ModelMapperAdapter {
335
// Methods for model mapper adapter functionality
336
// Bridges between ModelMapper and prediction operators
337
}
338
```
339
340
### Model Source Utilities
341
342
Utilities for loading and managing model data from different sources.
343
344
#### ModelSource Interface
345
346
```java { .api }
347
/**
348
* Interface for loading models from different sources
349
* Abstracts model data retrieval mechanisms
350
*/
351
public interface ModelSource extends Serializable {
352
353
/** Get model rows from runtime context */
354
List<Row> getModelRows(RuntimeContext runtimeContext);
355
}
356
```
357
358
#### BroadcastVariableModelSource Class
359
360
```java { .api }
361
/**
362
* Load models from Flink broadcast variables
363
* Efficient for distributing small to medium-sized models
364
*/
365
public class BroadcastVariableModelSource implements ModelSource {
366
// Implementation for broadcast variable model loading
367
// Used when model data fits in memory and needs wide distribution
368
}
369
```
370
371
#### RowsModelSource Class
372
373
```java { .api }
374
/**
375
* Load models from row collections
376
* Direct model data access from in-memory collections
377
*/
378
public class RowsModelSource implements ModelSource {
379
// Implementation for direct row-based model loading
380
// Used for simple model data scenarios
381
}
382
```
383
384
**Usage Examples:**
385
386
```java
387
// Using model sources in custom operators
388
public class MyPredictionOp extends BatchOperator<MyPredictionOp> {
389
private ModelSource modelSource;
390
391
public MyPredictionOp setModelSource(ModelSource modelSource) {
392
this.modelSource = modelSource;
393
return this;
394
}
395
396
@Override
397
public MyPredictionOp linkFrom(BatchOperator<?>... inputs) {
398
// In the actual operator function:
399
// List<Row> modelRows = modelSource.getModelRows(runtimeContext);
400
// ... use model data for predictions
401
402
return this;
403
}
404
}
405
406
// Create model sources
407
ModelSource broadcastSource = new BroadcastVariableModelSource(/* broadcast name */);
408
ModelSource rowsSource = new RowsModelSource(/* model rows */);
409
410
// Use in operator
411
MyPredictionOp predictor = new MyPredictionOp()
412
.setModelSource(broadcastSource);
413
```
414
415
## Utility Integration Patterns
416
417
### Schema Validation Pipeline
418
419
Combine multiple utilities for robust schema validation:
420
421
```java
422
public class SchemaValidator {
423
424
public static void validateMLSchema(TableSchema schema,
425
String[] requiredCols,
426
String[] numericCols,
427
String[] vectorCols) {
428
// Check required columns exist
429
TableUtil.assertSelectedColExist(schema.getFieldNames(), requiredCols);
430
431
// Validate numeric columns
432
if (numericCols != null) {
433
TableUtil.assertNumericalCols(schema, numericCols);
434
}
435
436
// Validate vector columns
437
if (vectorCols != null) {
438
TableUtil.assertVectorCols(schema, vectorCols);
439
}
440
}
441
}
442
443
// Usage in ML component
444
@Override
445
protected MyModel fit(BatchOperator input) {
446
// Validate input schema
447
SchemaValidator.validateMLSchema(
448
input.getSchema(),
449
new String[]{"features", "label"}, // Required columns
450
new String[]{"label"}, // Must be numeric
451
new String[]{"features"} // Must be vector
452
);
453
454
// Proceed with training
455
// ...
456
}
457
```
458
459
### Type-Safe Column Operations
460
461
Use utilities for type-safe column manipulation:
462
463
```java
464
public class TypeSafeColumnOps {
465
466
public static String[] getCompatibleColumns(TableSchema schema,
467
TypeInformation<?> targetType) {
468
return Arrays.stream(schema.getFieldNames())
469
.filter(col -> {
470
TypeInformation<?> colType = TableUtil.findColType(schema, col);
471
return colType.equals(targetType);
472
})
473
.toArray(String[]::new);
474
}
475
476
public static Map<String, TypeInformation<?>> getColumnTypeMap(TableSchema schema) {
477
String[] names = schema.getFieldNames();
478
TypeInformation<?>[] types = schema.getFieldTypes();
479
480
Map<String, TypeInformation<?>> typeMap = new HashMap<>();
481
for (int i = 0; i < names.length; i++) {
482
typeMap.put(names[i], types[i]);
483
}
484
return typeMap;
485
}
486
}
487
488
// Usage
489
String[] vectorCols = TypeSafeColumnOps.getCompatibleColumns(schema, VectorTypes.VECTOR);
490
String[] denseCols = TypeSafeColumnOps.getCompatibleColumns(schema, VectorTypes.DENSE_VECTOR);
491
```
492
493
### Data Formatting and Inspection
494
495
Utilities for debugging and data inspection:
496
497
```java
498
public class DataInspector {
499
500
public static void printTableSummary(TableSchema schema, List<Row> sampleData) {
501
// Print schema information
502
System.out.println("Table Schema:");
503
for (int i = 0; i < schema.getFieldCount(); i++) {
504
String name = schema.getFieldNames()[i];
505
TypeInformation<?> type = schema.getFieldTypes()[i];
506
System.out.println(" " + name + ": " + type);
507
}
508
509
// Print sample data
510
if (!sampleData.isEmpty()) {
511
System.out.println("\nSample Data:");
512
String formatted = TableUtil.format(schema.getFieldNames(), sampleData);
513
System.out.println(formatted);
514
}
515
516
// Print column type summary
517
String[] numericCols = TableUtil.getNumericCols(schema);
518
String[] stringCols = TableUtil.getStringCols(schema);
519
520
System.out.println("\nColumn Types:");
521
System.out.println(" Numeric: " + Arrays.toString(numericCols));
522
System.out.println(" String: " + Arrays.toString(stringCols));
523
}
524
}
525
```
526
527
These utility libraries provide essential support for building robust ML applications with proper validation, type safety, and debugging capabilities.