Utility functions for table operations, column management, type checking, and data format conversion between Flink table types. These utilities provide essential functionality for working with Flink tables in ML contexts.
Core utility class providing comprehensive table operations including column discovery, type validation, and schema manipulation.
/**
* Utility methods for table operations and column management
*/
public class TableUtil {
/**
* Generate temporary table name
* @return Unique temporary table name
*/
public static String getTempTableName();
/**
* Find column index in array
* @param tableCols Array of column names
* @param targetCol Target column name to find
* @return Index of target column, -1 if not found
*/
public static int findColIndex(String[] tableCols, String targetCol);
/**
* Find column index in table schema
* @param tableSchema Table schema
* @param targetCol Target column name to find
* @return Index of target column, -1 if not found
*/
public static int findColIndex(TableSchema tableSchema, String targetCol);
/**
* Find multiple column indices in array
* @param tableCols Array of column names
* @param targetCols Target column names to find
* @return Array of indices for target columns
*/
public static int[] findColIndices(String[] tableCols, String[] targetCols);
/**
* Find multiple column indices in table schema
* @param tableSchema Table schema
* @param targetCols Target column names to find
* @return Array of indices for target columns
*/
public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);
/**
* Find column types from table schema
* @param tableSchema Table schema
* @param targetCols Target column names
* @return Array of TypeInformation for target columns
*/
public static TypeInformation<?>[] findColTypes(TableSchema tableSchema, String[] targetCols);
/**
* Find single column type from table schema
* @param tableSchema Table schema
* @param targetCol Target column name
* @return TypeInformation for target column
*/
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
/**
* Check if data type is supported numeric type
* @param dataType Type to check
* @return true if numeric type, false otherwise
*/
public static boolean isSupportedNumericType(TypeInformation<?> dataType);
/**
* Check if data type is string type
* @param dataType Type to check
* @return true if string type, false otherwise
*/
public static boolean isString(TypeInformation<?> dataType);
/**
* Check if data type is vector type
* @param dataType Type to check
* @return true if vector type, false otherwise
*/
public static boolean isVector(TypeInformation<?> dataType);
/**
* Assert that selected columns exist in table
* @param tableCols Array of table column names
* @param selectedCols Selected column names to verify
* @throws IllegalArgumentException if any column doesn't exist
*/
public static void assertSelectedColExist(String[] tableCols, String... selectedCols);
/**
* Assert that columns are numeric types
* @param tableSchema Table schema
* @param selectedCols Column names to check
* @throws IllegalArgumentException if any column is not numeric
*/
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
/**
* Assert that columns are string types
* @param tableSchema Table schema
* @param selectedCols Column names to check
* @throws IllegalArgumentException if any column is not string
*/
public static void assertStringCols(TableSchema tableSchema, String... selectedCols);
/**
* Assert that columns are vector types
* @param tableSchema Table schema
* @param selectedCols Column names to check
* @throws IllegalArgumentException if any column is not vector
*/
public static void assertVectorCols(TableSchema tableSchema, String... selectedCols);
/**
* Get all string column names from schema
* @param tableSchema Table schema
* @return Array of string column names
*/
public static String[] getStringCols(TableSchema tableSchema);
/**
* Get string column names excluding specified columns
* @param tableSchema Table schema
* @param excludeCols Columns to exclude
* @return Array of string column names (excluding specified)
*/
public static String[] getStringCols(TableSchema tableSchema, String[] excludeCols);
/**
* Get all numeric column names from schema
* @param tableSchema Table schema
* @return Array of numeric column names
*/
public static String[] getNumericCols(TableSchema tableSchema);
/**
* Get numeric column names excluding specified columns
* @param tableSchema Table schema
* @param excludeCols Columns to exclude
* @return Array of numeric column names (excluding specified)
*/
public static String[] getNumericCols(TableSchema tableSchema, String[] excludeCols);
/**
* Get categorical columns from feature columns
* @param tableSchema Table schema
* @param featureCols Feature column names
* @param categoricalCols Explicitly specified categorical columns
* @return Array of categorical column names
*/
public static String[] getCategoricalCols(TableSchema tableSchema, String[] featureCols,
String[] categoricalCols);
/**
* Format column names as markdown table header
* @param colNames Column names
* @return Markdown formatted header string
*/
public static String formatTitle(String[] colNames);
/**
* Format row as markdown table row
* @param row Row data
* @return Markdown formatted row string
*/
public static String formatRows(Row row);
/**
* Format table data as markdown table
* @param colNames Column names
* @param data List of row data
* @return Complete markdown formatted table
*/
public static String format(String[] colNames, List<Row> data);
/**
* Convert column names to SQL SELECT clause
* @param colNames Column names
* @return SQL clause string
*/
public static String columnsToSqlClause(String[] colNames);
}Usage Examples:
import org.apache.flink.ml.common.utils.TableUtil;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
// Column discovery and validation
TableSchema schema = table.getSchema();
String[] colNames = schema.getFieldNames();
// Find specific columns
int ageIndex = TableUtil.findColIndex(colNames, "age");
int[] featureIndices = TableUtil.findColIndices(schema, new String[]{"feature1", "feature2"});
// Type checking
TypeInformation<?> ageType = TableUtil.findColType(schema, "age");
boolean isNumeric = TableUtil.isSupportedNumericType(ageType);
// Column validation
TableUtil.assertSelectedColExist(colNames, "age", "name", "salary");
TableUtil.assertNumericalCols(schema, "age", "salary", "score");
TableUtil.assertStringCols(schema, "name", "category");
// Get columns by type
String[] stringCols = TableUtil.getStringCols(schema);
String[] numericCols = TableUtil.getNumericCols(schema, new String[]{"id"}); // Exclude idBuilt-in type information constants for vector types used throughout the Flink ML library.
/**
* Built-in vector type information
*/
public class VectorTypes {
/** TypeInformation for DenseVector */
public static final TypeInformation<DenseVector> DENSE_VECTOR;
/** TypeInformation for SparseVector */
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
/** TypeInformation for base Vector class */
public static final TypeInformation<Vector> VECTOR;
}Usage Examples:
import org.apache.flink.ml.common.utils.VectorTypes;
import org.apache.flink.api.common.typeinfo.TypeInformation;
// Use vector type information
TypeInformation<DenseVector> denseVectorType = VectorTypes.DENSE_VECTOR;
TypeInformation<SparseVector> sparseVectorType = VectorTypes.SPARSE_VECTOR;
TypeInformation<Vector> vectorType = VectorTypes.VECTOR;
// Type checking
boolean isDenseVector = TableUtil.findColType(schema, "features").equals(VectorTypes.DENSE_VECTOR);
boolean isAnyVector = TableUtil.isVector(TableUtil.findColType(schema, "features"));
// Schema creation with vector columns
TableSchema.Builder schemaBuilder = TableSchema.builder()
.field("id", Types.LONG)
.field("features", VectorTypes.DENSE_VECTOR)
.field("sparse_features", VectorTypes.SPARSE_VECTOR);Additional utilities for data conversion between different Flink data structures.
/**
* DataStream conversion utilities
*/
public class DataStreamConversionUtil {
// Utility methods for converting between DataStream and Table
// Implementation details available in source code
}
/**
* DataSet conversion utilities (for batch processing)
*/
public class DataSetConversionUtil {
// Utility methods for converting between DataSet and Table
// Implementation details available in source code
}Utilities for formatting table data for display and debugging purposes.
Usage Examples:
import org.apache.flink.types.Row;
import java.util.List;
import java.util.Arrays;
// Sample data for formatting
String[] columnNames = {"Name", "Age", "Salary"};
List<Row> data = Arrays.asList(
Row.of("Alice", 25, 50000),
Row.of("Bob", 30, 60000),
Row.of("Charlie", 35, 70000)
);
// Format as markdown table
String markdownTable = TableUtil.format(columnNames, data);
System.out.println(markdownTable);
/* Output:
| Name | Age | Salary |
|------|-----|--------|
| Alice | 25 | 50000 |
| Bob | 30 | 60000 |
| Charlie | 35 | 70000 |
*/
// Format individual components
String header = TableUtil.formatTitle(columnNames);
String firstRow = TableUtil.formatRows(data.get(0));
// SQL clause generation
String sqlClause = TableUtil.columnsToSqlClause(columnNames);
// Result: "Name, Age, Salary"Advanced patterns for working with table schemas and column management.
Usage Examples:
// Column filtering and selection patterns
public class ColumnManager {
public static String[] selectNumericColumns(TableSchema schema, String[] candidates) {
return Arrays.stream(candidates)
.filter(col -> {
try {
TypeInformation<?> type = TableUtil.findColType(schema, col);
return TableUtil.isSupportedNumericType(type);
} catch (Exception e) {
return false;
}
})
.toArray(String[]::new);
}
public static String[] findFeatureColumns(TableSchema schema, String labelCol) {
String[] allCols = schema.getFieldNames();
return Arrays.stream(allCols)
.filter(col -> !col.equals(labelCol))
.filter(col -> {
TypeInformation<?> type = TableUtil.findColType(schema, col);
return TableUtil.isSupportedNumericType(type) || TableUtil.isVector(type);
})
.toArray(String[]::new);
}
}
// Usage
TableSchema schema = table.getSchema();
String[] candidates = {"age", "salary", "name", "category"};
String[] numericFeatures = ColumnManager.selectNumericColumns(schema, candidates);
String[] allFeatures = ColumnManager.findFeatureColumns(schema, "label");
// Validation with error handling
try {
TableUtil.assertNumericalCols(schema, numericFeatures);
System.out.println("All selected columns are numeric");
} catch (IllegalArgumentException e) {
System.err.println("Non-numeric column found: " + e.getMessage());
}
// Safe column access
public static double[] getNumericColumnData(Table table, String columnName) {
TableSchema schema = table.getSchema();
// Validate column exists and is numeric
TableUtil.assertSelectedColExist(schema.getFieldNames(), columnName);
TableUtil.assertNumericalCols(schema, columnName);
// Extract data (implementation would depend on specific use case)
// This is a conceptual example
return new double[0]; // Placeholder
}