Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Incremental learning patterns and online algorithm implementations for streaming ML workflows. Demonstrates real-time model updates and streaming machine learning patterns.
Skeleton framework for incremental machine learning algorithms with online model updates.
/**
* Skeleton for incremental machine learning algorithms
* Demonstrates online learning patterns with streaming model updates
* @param args Command line arguments (--input path, --output path)
*/
public class IncrementalLearningSkeleton {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run with default sample data
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton
# Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton \
--input /path/to/training-data.txt --output /path/to/model-updates.txtThe incremental learning skeleton demonstrates patterns for:
// Stream of training examples
DataStream<TrainingExample> trainingData;
// Training example format (implementation specific)
class TrainingExample {
public double[] features;
public double label;
public long timestamp;
}// Maintain model parameters in keyed state
ValueState<ModelParameters> modelState;
// Model parameters (implementation specific)
class ModelParameters {
public double[] weights;
public double bias;
public long version;
public int sampleCount;
}public static class IncrementalLearningFunction
extends RichMapFunction<TrainingExample, ModelUpdate> {
private ValueState<ModelParameters> modelState;
private final double learningRate = 0.01;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<ModelParameters> descriptor =
new ValueStateDescriptor<>("model", ModelParameters.class);
modelState = getRuntimeContext().getState(descriptor);
}
@Override
public ModelUpdate map(TrainingExample example) throws Exception {
ModelParameters currentModel = modelState.value();
if (currentModel == null) {
currentModel = initializeModel();
}
// Apply incremental update (e.g., SGD)
ModelParameters updatedModel = updateModel(currentModel, example);
modelState.update(updatedModel);
return new ModelUpdate(updatedModel, example.timestamp);
}
private ModelParameters updateModel(ModelParameters model, TrainingExample example) {
// Stochastic Gradient Descent update
double prediction = predict(model, example.features);
double error = example.label - prediction;
// Update weights
for (int i = 0; i < model.weights.length; i++) {
model.weights[i] += learningRate * error * example.features[i];
}
model.bias += learningRate * error;
model.version++;
model.sampleCount++;
return model;
}
private double predict(ModelParameters model, double[] features) {
double result = model.bias;
for (int i = 0; i < features.length; i++) {
result += model.weights[i] * features[i];
}
return result;
}
}// Feature extraction and preprocessing
DataStream<TrainingExample> preprocessedData = rawDataStream
.map(new FeatureExtractionFunction())
.filter(new DataQualityFilter())
.keyBy(new FeatureKeySelector()); // Key by feature group or model ID// Complete incremental learning pipeline
DataStream<ModelUpdate> modelUpdates = trainingData
.keyBy(new ModelKeySelector()) // Partition by model ID
.map(new IncrementalLearningFunction()) // Apply incremental updates
.filter(new SignificantUpdateFilter()); // Only emit significant updates
// Broadcast model updates for prediction
BroadcastStream<ModelUpdate> modelBroadcast = modelUpdates
.broadcast(MODEL_UPDATE_DESCRIPTOR);// Use updated models for predictions
DataStream<Prediction> predictions = predictionRequests
.connect(modelBroadcast)
.process(new ModelPredictionFunction());
public static class ModelPredictionFunction
extends BroadcastProcessFunction<PredictionRequest, ModelUpdate, Prediction> {
@Override
public void processElement(
PredictionRequest request,
ReadOnlyContext ctx,
Collector<Prediction> out) throws Exception {
// Get latest model from broadcast state
ModelParameters model = ctx.getBroadcastState(MODEL_UPDATE_DESCRIPTOR)
.get(request.modelId);
if (model != null) {
double prediction = predict(model, request.features);
out.collect(new Prediction(request.id, prediction, model.version));
}
}
@Override
public void processBroadcastElement(
ModelUpdate update,
Context ctx,
Collector<Prediction> out) throws Exception {
// Update broadcast state with new model
ctx.getBroadcastState(MODEL_UPDATE_DESCRIPTOR)
.put(update.modelId, update.parameters);
}
}// Maintain multiple models for ensemble predictions
public static class EnsembleLearningFunction
extends KeyedProcessFunction<String, TrainingExample, EnsemblePrediction> {
private ListState<ModelParameters> ensembleModels;
@Override
public void processElement(
TrainingExample example,
Context ctx,
Collector<EnsemblePrediction> out) throws Exception {
List<ModelParameters> models = new ArrayList<>();
ensembleModels.get().forEach(models::add);
// Update each model in ensemble
for (int i = 0; i < models.size(); i++) {
models.set(i, updateModel(models.get(i), example));
}
// Make ensemble prediction
double[] predictions = models.stream()
.mapToDouble(model -> predict(model, example.features))
.toArray();
double ensemblePrediction = Arrays.stream(predictions).average().orElse(0.0);
out.collect(new EnsemblePrediction(example.id, ensemblePrediction, predictions));
// Update state
ensembleModels.clear();
ensembleModels.addAll(models);
}
}public static class DriftDetectionFunction
extends KeyedProcessFunction<String, PredictionResult, DriftAlert> {
private ValueState<Double> accuracyWindow;
private ValueState<Long> windowStartTime;
@Override
public void processElement(
PredictionResult result,
Context ctx,
Collector<DriftAlert> out) throws Exception {
// Update accuracy statistics
double currentAccuracy = accuracyWindow.value();
long windowStart = windowStartTime.value();
// Calculate sliding window accuracy
double newAccuracy = updateAccuracy(currentAccuracy, result);
accuracyWindow.update(newAccuracy);
// Check for concept drift
if (newAccuracy < DRIFT_THRESHOLD) {
out.collect(new DriftAlert(result.modelId, newAccuracy, ctx.timestamp()));
// Reset window
windowStartTime.update(ctx.timestamp());
accuracyWindow.clear();
}
}
}public static class ModelMetricsFunction
extends RichMapFunction<PredictionResult, ModelMetrics> {
private AggregatingState<Double, Double> accuracyAggregator;
private AggregatingState<Double, Double> latencyAggregator;
@Override
public void open(Configuration parameters) throws Exception {
// Configure accuracy aggregator
AggregatingStateDescriptor<Double, RunningAverage, Double> accuracyDesc =
new AggregatingStateDescriptor<>("accuracy", new AverageAccumulator(), Double.class);
accuracyAggregator = getRuntimeContext().getAggregatingState(accuracyDesc);
// Configure latency aggregator
AggregatingStateDescriptor<Double, RunningAverage, Double> latencyDesc =
new AggregatingStateDescriptor<>("latency", new AverageAccumulator(), Double.class);
latencyAggregator = getRuntimeContext().getAggregatingState(latencyDesc);
}
@Override
public ModelMetrics map(PredictionResult result) throws Exception {
// Update metrics
accuracyAggregator.add(result.accuracy);
latencyAggregator.add(result.latency);
return new ModelMetrics(
result.modelId,
accuracyAggregator.get(),
latencyAggregator.get(),
System.currentTimeMillis()
);
}
}/**
* Training example for incremental learning
*/
public class TrainingExample {
public String id;
public double[] features;
public double label;
public long timestamp;
public TrainingExample(String id, double[] features, double label, long timestamp);
}/**
* Model parameters for linear models
*/
public class ModelParameters {
public double[] weights;
public double bias;
public long version;
public int sampleCount;
public double learningRate;
public ModelParameters(int featureCount);
}/**
* Model update event
*/
public class ModelUpdate {
public String modelId;
public ModelParameters parameters;
public long timestamp;
public double performance;
public ModelUpdate(String modelId, ModelParameters parameters, long timestamp);
}/**
* Prediction request
*/
public class PredictionRequest {
public String id;
public String modelId;
public double[] features;
public long timestamp;
}
/**
* Prediction result
*/
public class PredictionResult {
public String id;
public String modelId;
public double prediction;
public double confidence;
public long modelVersion;
public double accuracy; // If ground truth available
public double latency; // Processing time
}Utility class providing sample training data for ML examples.
/**
* Sample data generator for incremental learning examples
*/
public class IncrementalLearningSkeletonData {
public static final TrainingExample[] SAMPLE_DATA;
/**
* Generate synthetic training data for linear regression
*/
public static Iterator<TrainingExample> createLinearRegressionData(
int numSamples, int numFeatures, double noise);
/**
* Generate synthetic classification data
*/
public static Iterator<TrainingExample> createClassificationData(
int numSamples, int numFeatures, int numClasses);
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10