Comprehensive machine learning library for Apache Flink that enables scalable ML pipelines on distributed stream processing platform.
—
Comprehensive utility libraries for table operations, vector parsing, data conversion, and statistical operations. Essential support classes for common ML operations and data processing tasks.
Comprehensive utility class for table content and schema operations.
/**
* Utility class for table content and schema operations
* Provides validation, column manipulation, and formatting functions
*/
public class TableUtil {
// Table and column utilities
/** Generate unique temporary table name */
public static String getTempTableName();
/** Find column index in string array */
public static int findColIndex(String[] tableCols, String targetCol);
/** Find column index in table schema */
public static int findColIndex(TableSchema tableSchema, String targetCol);
/** Find multiple column indices in string array */
public static int[] findColIndices(String[] tableCols, String[] targetCols);
/** Find multiple column indices in table schema */
public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);
/** Find column types for specified columns */
public static TypeInformation<?>[] findColTypes(TableSchema tableSchema, String[] targetCols);
/** Find single column type */
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
// Type checking utilities
/** Check if data type is supported numeric type */
public static boolean isSupportedNumericType(TypeInformation<?> dataType);
/** Check if data type is string type */
public static boolean isString(TypeInformation<?> dataType);
/** Check if data type is vector type */
public static boolean isVector(TypeInformation<?> dataType);
// Column validation utilities
/** Assert that selected columns exist in table */
public static void assertSelectedColExist(String[] tableCols, String... selectedCols);
/** Assert that selected columns are numerical */
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
/** Assert that selected columns are string type */
public static void assertStringCols(TableSchema tableSchema, String... selectedCols);
/** Assert that selected columns are vector type */
public static void assertVectorCols(TableSchema tableSchema, String... selectedCols);
// Column filtering utilities
/** Get all string columns from schema */
public static String[] getStringCols(TableSchema tableSchema);
/** Get string columns excluding specified columns */
public static String[] getStringCols(TableSchema tableSchema, String[] excludeCols);
/** Get all numeric columns from schema */
public static String[] getNumericCols(TableSchema tableSchema);
/** Get numeric columns excluding specified columns */
public static String[] getNumericCols(TableSchema tableSchema, String[] excludeCols);
/** Get categorical columns from feature columns */
public static String[] getCategoricalCols(TableSchema tableSchema,
String[] featureCols,
String[] categoricalCols);
// Formatting utilities
/** Format column names as markdown table header */
public static String formatTitle(String[] colNames);
/** Format table row as markdown */
public static String formatRows(Row row);
/** Format table data as markdown table */
public static String format(String[] colNames, List<Row> data);
/** Convert column names to SQL SELECT clause */
public static String columnsToSqlClause(String[] colNames);
}Usage Examples:
import org.apache.flink.ml.common.utils.TableUtil;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
// Schema validation
TableSchema schema = // ... your table schema
String targetCol = "features";
// Check if column exists and get its type
int colIndex = TableUtil.findColIndex(schema, targetCol);
TypeInformation<?> colType = TableUtil.findColType(schema, targetCol);
// Validate column types
TableUtil.assertNumericalCols(schema, "age", "salary", "score");
TableUtil.assertVectorCols(schema, "features", "embeddings");
// Get columns by type
String[] numericCols = TableUtil.getNumericCols(schema);
String[] stringCols = TableUtil.getStringCols(schema, new String[]{"id"}); // Exclude 'id'
// Type checking
boolean isNumeric = TableUtil.isSupportedNumericType(colType);
boolean isVector = TableUtil.isVector(colType);
// Format table data for display
String[] headers = {"Name", "Age", "Score"};
List<Row> data = Arrays.asList(
Row.of("Alice", 25, 95.0),
Row.of("Bob", 30, 87.5)
);
String markdown = TableUtil.format(headers, data);
System.out.println(markdown);
/*
| Name | Age | Score |
|------|-----|-------|
| Alice | 25 | 95.0 |
| Bob | 30 | 87.5 |
*/Constants class providing built-in vector type information for Flink's type system.
/**
* Built-in vector type information constants
* Used for table schema definition and type checking
*/
public class VectorTypes {
/** Type information for DenseVector */
public static final TypeInformation<DenseVector> DENSE_VECTOR;
/** Type information for SparseVector */
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
/** Type information for general Vector (base class) */
public static final TypeInformation<Vector> VECTOR;
}Usage Examples:
import org.apache.flink.ml.common.utils.VectorTypes;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
// Create table schema with vector columns
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.BIGINT())
.field("dense_features", VectorTypes.DENSE_VECTOR)
.field("sparse_features", VectorTypes.SPARSE_VECTOR)
.field("generic_vector", VectorTypes.VECTOR)
.build();
// Type checking with vector types
TypeInformation<?> colType = schema.getFieldDataTypes()[1].bridgedTo(VectorTypes.DENSE_VECTOR);
boolean isDenseVector = colType.equals(VectorTypes.DENSE_VECTOR);Utilities for converting between different Flink data representations.
/**
* Utility class for DataSet conversion operations
* Provides methods for converting between DataSet and other formats
*/
public class DataSetConversionUtil {
// Methods for DataSet conversions
// Implementation details depend on specific conversion needs
}/**
* Utility class for DataStream conversion operations
* Provides methods for converting between DataStream and other formats
*/
public class DataStreamConversionUtil {
// Methods for DataStream conversions
// Implementation details depend on specific conversion needs
}Helper class for managing output column configurations in ML operations.
/**
* Helper class for output column management
* Assists with column naming and schema generation
*/
public class OutputColsHelper {
// Methods for managing output column configurations
// Used internally by ML operators for column management
}Utilities for row-wise data transformations and model applications.
/**
* Abstract class for row-wise transformations
* Base class for implementing custom row mappers
*/
public abstract class Mapper implements Serializable {
/** Input schema field names */
private String[] dataFieldNames;
/** Input schema field types */
private DataType[] dataFieldTypes;
/** Mapper parameters */
protected Params params;
/** Create mapper with schema and parameters */
public Mapper(TableSchema dataSchema, Params params);
/** Get input data schema */
public TableSchema getDataSchema();
/** Transform input row to output row (must implement) */
public abstract Row map(Row row);
/** Get output schema (must implement) */
public abstract TableSchema getOutputSchema();
}Implementation Example:
public class ScalingMapper extends Mapper {
public ScalingMapper(TableSchema dataSchema, Params params) {
super(dataSchema, params);
}
@Override
public Row map(Row row) {
// Extract scaling parameters
double scaleFactor = params.get(SCALE_FACTOR);
String inputCol = params.get(INPUT_COL);
// Apply scaling transformation
int colIndex = // ... find column index
double originalValue = row.getField(colIndex);
double scaledValue = originalValue * scaleFactor;
// Create output row
Row output = Row.copy(row);
output.setField(colIndex, scaledValue);
return output;
}
@Override
public TableSchema getOutputSchema() {
// Return schema with same structure as input
return getDataSchema();
}
}/**
* Adapter for integrating Mapper with operator framework
* Converts row-wise mappers to table operations
*/
public class MapperAdapter {
// Methods for adapter functionality
// Bridges between Mapper and BatchOperator/StreamOperator
}/**
* Abstract class for model-based row transformations
* Extends Mapper with model data support
*/
public abstract class ModelMapper extends Mapper {
// Additional functionality for model-based row transformations
// Includes model data loading and management
}/**
* Adapter for integrating ModelMapper with operator framework
* Converts model-based row mappers to table operations
*/
public class ModelMapperAdapter {
// Methods for model mapper adapter functionality
// Bridges between ModelMapper and prediction operators
}Utilities for loading and managing model data from different sources.
/**
* Interface for loading models from different sources
* Abstracts model data retrieval mechanisms
*/
public interface ModelSource extends Serializable {
/** Get model rows from runtime context */
List<Row> getModelRows(RuntimeContext runtimeContext);
}/**
* Load models from Flink broadcast variables
* Efficient for distributing small to medium-sized models
*/
public class BroadcastVariableModelSource implements ModelSource {
// Implementation for broadcast variable model loading
// Used when model data fits in memory and needs wide distribution
}/**
* Load models from row collections
* Direct model data access from in-memory collections
*/
public class RowsModelSource implements ModelSource {
// Implementation for direct row-based model loading
// Used for simple model data scenarios
}Usage Examples:
// Using model sources in custom operators
public class MyPredictionOp extends BatchOperator<MyPredictionOp> {
private ModelSource modelSource;
public MyPredictionOp setModelSource(ModelSource modelSource) {
this.modelSource = modelSource;
return this;
}
@Override
public MyPredictionOp linkFrom(BatchOperator<?>... inputs) {
// In the actual operator function:
// List<Row> modelRows = modelSource.getModelRows(runtimeContext);
// ... use model data for predictions
return this;
}
}
// Create model sources
ModelSource broadcastSource = new BroadcastVariableModelSource(/* broadcast name */);
ModelSource rowsSource = new RowsModelSource(/* model rows */);
// Use in operator
MyPredictionOp predictor = new MyPredictionOp()
.setModelSource(broadcastSource);Combine multiple utilities for robust schema validation:
public class SchemaValidator {
public static void validateMLSchema(TableSchema schema,
String[] requiredCols,
String[] numericCols,
String[] vectorCols) {
// Check required columns exist
TableUtil.assertSelectedColExist(schema.getFieldNames(), requiredCols);
// Validate numeric columns
if (numericCols != null) {
TableUtil.assertNumericalCols(schema, numericCols);
}
// Validate vector columns
if (vectorCols != null) {
TableUtil.assertVectorCols(schema, vectorCols);
}
}
}
// Usage in ML component
@Override
protected MyModel fit(BatchOperator input) {
// Validate input schema
SchemaValidator.validateMLSchema(
input.getSchema(),
new String[]{"features", "label"}, // Required columns
new String[]{"label"}, // Must be numeric
new String[]{"features"} // Must be vector
);
// Proceed with training
// ...
}Use utilities for type-safe column manipulation:
public class TypeSafeColumnOps {
public static String[] getCompatibleColumns(TableSchema schema,
TypeInformation<?> targetType) {
return Arrays.stream(schema.getFieldNames())
.filter(col -> {
TypeInformation<?> colType = TableUtil.findColType(schema, col);
return colType.equals(targetType);
})
.toArray(String[]::new);
}
public static Map<String, TypeInformation<?>> getColumnTypeMap(TableSchema schema) {
String[] names = schema.getFieldNames();
TypeInformation<?>[] types = schema.getFieldTypes();
Map<String, TypeInformation<?>> typeMap = new HashMap<>();
for (int i = 0; i < names.length; i++) {
typeMap.put(names[i], types[i]);
}
return typeMap;
}
}
// Usage
String[] vectorCols = TypeSafeColumnOps.getCompatibleColumns(schema, VectorTypes.VECTOR);
String[] denseCols = TypeSafeColumnOps.getCompatibleColumns(schema, VectorTypes.DENSE_VECTOR);Utilities for debugging and data inspection:
public class DataInspector {
public static void printTableSummary(TableSchema schema, List<Row> sampleData) {
// Print schema information
System.out.println("Table Schema:");
for (int i = 0; i < schema.getFieldCount(); i++) {
String name = schema.getFieldNames()[i];
TypeInformation<?> type = schema.getFieldTypes()[i];
System.out.println(" " + name + ": " + type);
}
// Print sample data
if (!sampleData.isEmpty()) {
System.out.println("\nSample Data:");
String formatted = TableUtil.format(schema.getFieldNames(), sampleData);
System.out.println(formatted);
}
// Print column type summary
String[] numericCols = TableUtil.getNumericCols(schema);
String[] stringCols = TableUtil.getStringCols(schema);
System.out.println("\nColumn Types:");
System.out.println(" Numeric: " + Arrays.toString(numericCols));
System.out.println(" String: " + Arrays.toString(stringCols));
}
}These utility libraries provide essential support for building robust ML applications with proper validation, type safety, and debugging capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-ml-uber-2-11