CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

machine-learning.mddocs/

Machine Learning Examples

Incremental learning patterns and online algorithm implementations for streaming ML workflows. Demonstrates real-time model updates and streaming machine learning patterns.

Capabilities

IncrementalLearningSkeleton

Skeleton framework for incremental machine learning algorithms with online model updates.

/**
 * Skeleton for incremental machine learning algorithms
 * Demonstrates online learning patterns with streaming model updates
 * @param args Command line arguments (--input path, --output path)
 */
public class IncrementalLearningSkeleton {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run with default sample data
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton

# Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton \
  --input /path/to/training-data.txt --output /path/to/model-updates.txt

Online Learning Patterns

Streaming Model Updates

The incremental learning skeleton demonstrates patterns for:

  1. Online Training Data Processing: Continuous ingestion of training examples
  2. Model State Management: Maintaining and updating model parameters in stream state
  3. Incremental Updates: Applying mini-batch or single-example updates to models
  4. Model Versioning: Tracking model versions and update history
  5. Prediction Integration: Using updated models for real-time predictions

Key Components

Training Data Stream

// Stream of training examples
DataStream<TrainingExample> trainingData;

// Training example format (implementation specific)
class TrainingExample {
    public double[] features;
    public double label;
    public long timestamp;
}

Model State Management

// Maintain model parameters in keyed state
ValueState<ModelParameters> modelState;

// Model parameters (implementation specific)
class ModelParameters {
    public double[] weights;
    public double bias;
    public long version;
    public int sampleCount;
}

Incremental Update Function

public static class IncrementalLearningFunction 
    extends RichMapFunction<TrainingExample, ModelUpdate> {
    
    private ValueState<ModelParameters> modelState;
    private final double learningRate = 0.01;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<ModelParameters> descriptor = 
            new ValueStateDescriptor<>("model", ModelParameters.class);
        modelState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public ModelUpdate map(TrainingExample example) throws Exception {
        ModelParameters currentModel = modelState.value();
        if (currentModel == null) {
            currentModel = initializeModel();
        }
        
        // Apply incremental update (e.g., SGD)
        ModelParameters updatedModel = updateModel(currentModel, example);
        modelState.update(updatedModel);
        
        return new ModelUpdate(updatedModel, example.timestamp);
    }
    
    private ModelParameters updateModel(ModelParameters model, TrainingExample example) {
        // Stochastic Gradient Descent update
        double prediction = predict(model, example.features);
        double error = example.label - prediction;
        
        // Update weights
        for (int i = 0; i < model.weights.length; i++) {
            model.weights[i] += learningRate * error * example.features[i];
        }
        model.bias += learningRate * error;
        model.version++;
        model.sampleCount++;
        
        return model;
    }
    
    private double predict(ModelParameters model, double[] features) {
        double result = model.bias;
        for (int i = 0; i < features.length; i++) {
            result += model.weights[i] * features[i];
        }
        return result;
    }
}

ML Pipeline Patterns

Feature Engineering Stream

// Feature extraction and preprocessing
DataStream<TrainingExample> preprocessedData = rawDataStream
    .map(new FeatureExtractionFunction())
    .filter(new DataQualityFilter())
    .keyBy(new FeatureKeySelector()); // Key by feature group or model ID

Model Training Pipeline

// Complete incremental learning pipeline
DataStream<ModelUpdate> modelUpdates = trainingData
    .keyBy(new ModelKeySelector())           // Partition by model ID
    .map(new IncrementalLearningFunction())  // Apply incremental updates
    .filter(new SignificantUpdateFilter());  // Only emit significant updates

// Broadcast model updates for prediction
BroadcastStream<ModelUpdate> modelBroadcast = modelUpdates
    .broadcast(MODEL_UPDATE_DESCRIPTOR);

Real-time Prediction

// Use updated models for predictions
DataStream<Prediction> predictions = predictionRequests
    .connect(modelBroadcast)
    .process(new ModelPredictionFunction());

public static class ModelPredictionFunction 
    extends BroadcastProcessFunction<PredictionRequest, ModelUpdate, Prediction> {
    
    @Override
    public void processElement(
        PredictionRequest request,
        ReadOnlyContext ctx,
        Collector<Prediction> out) throws Exception {
        
        // Get latest model from broadcast state
        ModelParameters model = ctx.getBroadcastState(MODEL_UPDATE_DESCRIPTOR)
            .get(request.modelId);
        
        if (model != null) {
            double prediction = predict(model, request.features);
            out.collect(new Prediction(request.id, prediction, model.version));
        }
    }
    
    @Override
    public void processBroadcastElement(
        ModelUpdate update,
        Context ctx,
        Collector<Prediction> out) throws Exception {
        
        // Update broadcast state with new model
        ctx.getBroadcastState(MODEL_UPDATE_DESCRIPTOR)
            .put(update.modelId, update.parameters);
    }
}

Advanced ML Patterns

Model Ensembles

// Maintain multiple models for ensemble predictions
public static class EnsembleLearningFunction 
    extends KeyedProcessFunction<String, TrainingExample, EnsemblePrediction> {
    
    private ListState<ModelParameters> ensembleModels;
    
    @Override
    public void processElement(
        TrainingExample example,
        Context ctx,
        Collector<EnsemblePrediction> out) throws Exception {
        
        List<ModelParameters> models = new ArrayList<>();
        ensembleModels.get().forEach(models::add);
        
        // Update each model in ensemble
        for (int i = 0; i < models.size(); i++) {
            models.set(i, updateModel(models.get(i), example));
        }
        
        // Make ensemble prediction
        double[] predictions = models.stream()
            .mapToDouble(model -> predict(model, example.features))
            .toArray();
        
        double ensemblePrediction = Arrays.stream(predictions).average().orElse(0.0);
        out.collect(new EnsemblePrediction(example.id, ensemblePrediction, predictions));
        
        // Update state
        ensembleModels.clear();
        ensembleModels.addAll(models);
    }
}

Concept Drift Detection

public static class DriftDetectionFunction 
    extends KeyedProcessFunction<String, PredictionResult, DriftAlert> {
    
    private ValueState<Double> accuracyWindow;
    private ValueState<Long> windowStartTime;
    
    @Override
    public void processElement(
        PredictionResult result,
        Context ctx,
        Collector<DriftAlert> out) throws Exception {
        
        // Update accuracy statistics
        double currentAccuracy = accuracyWindow.value();
        long windowStart = windowStartTime.value();
        
        // Calculate sliding window accuracy
        double newAccuracy = updateAccuracy(currentAccuracy, result);
        accuracyWindow.update(newAccuracy);
        
        // Check for concept drift
        if (newAccuracy < DRIFT_THRESHOLD) {
            out.collect(new DriftAlert(result.modelId, newAccuracy, ctx.timestamp()));
            
            // Reset window
            windowStartTime.update(ctx.timestamp());
            accuracyWindow.clear();
        }
    }
}

Model Performance Monitoring

public static class ModelMetricsFunction 
    extends RichMapFunction<PredictionResult, ModelMetrics> {
    
    private AggregatingState<Double, Double> accuracyAggregator;
    private AggregatingState<Double, Double> latencyAggregator;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // Configure accuracy aggregator
        AggregatingStateDescriptor<Double, RunningAverage, Double> accuracyDesc = 
            new AggregatingStateDescriptor<>("accuracy", new AverageAccumulator(), Double.class);
        accuracyAggregator = getRuntimeContext().getAggregatingState(accuracyDesc);
        
        // Configure latency aggregator
        AggregatingStateDescriptor<Double, RunningAverage, Double> latencyDesc = 
            new AggregatingStateDescriptor<>("latency", new AverageAccumulator(), Double.class);
        latencyAggregator = getRuntimeContext().getAggregatingState(latencyDesc);
    }
    
    @Override
    public ModelMetrics map(PredictionResult result) throws Exception {
        // Update metrics
        accuracyAggregator.add(result.accuracy);
        latencyAggregator.add(result.latency);
        
        return new ModelMetrics(
            result.modelId,
            accuracyAggregator.get(),
            latencyAggregator.get(),
            System.currentTimeMillis()
        );
    }
}

Data Structures

Training Example

/**
 * Training example for incremental learning
 */
public class TrainingExample {
    public String id;
    public double[] features;
    public double label;
    public long timestamp;
    
    public TrainingExample(String id, double[] features, double label, long timestamp);
}

Model Parameters

/**
 * Model parameters for linear models
 */
public class ModelParameters {
    public double[] weights;
    public double bias;
    public long version;
    public int sampleCount;
    public double learningRate;
    
    public ModelParameters(int featureCount);
}

Model Update

/**
 * Model update event
 */
public class ModelUpdate {
    public String modelId;
    public ModelParameters parameters;
    public long timestamp;
    public double performance;
    
    public ModelUpdate(String modelId, ModelParameters parameters, long timestamp);
}

Prediction Request/Result

/**
 * Prediction request
 */
public class PredictionRequest {
    public String id;
    public String modelId;
    public double[] features;
    public long timestamp;
}

/**
 * Prediction result
 */
public class PredictionResult {
    public String id;
    public String modelId;
    public double prediction;
    public double confidence;
    public long modelVersion;
    public double accuracy;    // If ground truth available
    public double latency;     // Processing time
}

Sample Data Utilities

IncrementalLearningSkeletonData

Utility class providing sample training data for ML examples.

/**
 * Sample data generator for incremental learning examples
 */
public class IncrementalLearningSkeletonData {
    public static final TrainingExample[] SAMPLE_DATA;
    
    /**
     * Generate synthetic training data for linear regression
     */
    public static Iterator<TrainingExample> createLinearRegressionData(
        int numSamples, int numFeatures, double noise);
    
    /**
     * Generate synthetic classification data
     */
    public static Iterator<TrainingExample> createClassificationData(
        int numSamples, int numFeatures, int numClasses);
}

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-ml_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json