or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

complex-event-processing.mddocs/

Complex Event Processing (CEP)

This document covers pattern matching and complex event detection capabilities on streaming data using Apache Flink's CEP library bundled in the Table Uber Blink package.

Basic Pattern Definition

Pattern Creation

class Pattern<T, F extends T> {
    static <X> Pattern<X, X> begin(String name);
    static <X> Pattern<X, X> begin(String name, AfterMatchSkipStrategy afterMatchSkipStrategy);
    
    Pattern<T, F> where(SimpleCondition<F> condition);
    Pattern<T, F> where(IterativeCondition<F> condition);
    Pattern<T, F> or(SimpleCondition<F> condition);
    Pattern<T, F> or(IterativeCondition<F> condition);
    
    Pattern<T, F> next(String name);
    Pattern<T, F> followedBy(String name);
    Pattern<T, F> followedByAny(String name);
    Pattern<T, F> notNext();
    Pattern<T, F> notFollowedBy();
    
    Pattern<T, F> within(Time within);
    Pattern<T, F> times(int times);
    Pattern<T, F> times(int fromTimes, int toTimes);
    Pattern<T, F> oneOrMore();
    Pattern<T, F> timesOrMore(int times);
    Pattern<T, F> optional();
    Pattern<T, F> greedy();
}

Simple Pattern Example

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("login");
        }
    })
    .next("middle")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("purchase");
        }
    })
    .within(Time.minutes(10));

Pattern Conditions

Simple Conditions

abstract class SimpleCondition<T> implements Function {
    abstract boolean filter(T value) throws Exception;
}

Usage:

// Simple condition for event type
SimpleCondition<Event> loginCondition = new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return "login".equals(event.getType());
    }
};

// Using lambda
SimpleCondition<Event> highValueCondition = event -> event.getAmount() > 1000;

Iterative Conditions

abstract class IterativeCondition<T> extends RichFunction {
    abstract boolean filter(T value, Context<T> ctx) throws Exception;
    
    interface Context<T> {
        Iterable<T> getEventsForPattern(String name);
        <X> Iterable<X> getEventsForPattern(String name, Class<X> clazz);
        long timestamp();
    }
}

Usage:

// Iterative condition accessing previous events
IterativeCondition<Event> increasingAmountCondition = new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event current, Context<Event> ctx) throws Exception {
        if (!current.getType().equals("purchase")) {
            return false;
        }
        
        for (Event prev : ctx.getEventsForPattern("previous")) {
            if (current.getAmount() <= prev.getAmount()) {
                return false;
            }
        }
        return true;
    }
};

Pattern Sequence Types

Strict Contiguity (next)

// Events must occur immediately one after another
Pattern<Event, ?> strictPattern = Pattern.<Event>begin("first")
    .where(event -> event.getType().equals("A"))
    .next("second")
    .where(event -> event.getType().equals("B"));

Relaxed Contiguity (followedBy)

// Events can have other events in between
Pattern<Event, ?> relaxedPattern = Pattern.<Event>begin("first")
    .where(event -> event.getType().equals("A"))
    .followedBy("second")
    .where(event -> event.getType().equals("B"));

Non-Deterministic Relaxed Contiguity (followedByAny)

// Multiple matches possible for the same event
Pattern<Event, ?> nonDetPattern = Pattern.<Event>begin("first")
    .where(event -> event.getType().equals("A"))
    .followedByAny("second")
    .where(event -> event.getType().equals("B"));

Quantifiers

Times

// Exactly 3 times
Pattern<Event, ?> exactPattern = Pattern.<Event>begin("events")
    .where(event -> event.getType().equals("click"))
    .times(3);

// Between 2 and 4 times
Pattern<Event, ?> rangePattern = Pattern.<Event>begin("events")
    .where(event -> event.getType().equals("click"))
    .times(2, 4);

One or More

// One or more occurrences
Pattern<Event, ?> oneOrMorePattern = Pattern.<Event>begin("events")
    .where(event -> event.getType().equals("click"))
    .oneOrMore();

// At least 2 occurrences
Pattern<Event, ?> timesOrMorePattern = Pattern.<Event>begin("events")
    .where(event -> event.getType().equals("click"))
    .timesOrMore(2);

Optional

// Optional event
Pattern<Event, ?> optionalPattern = Pattern.<Event>begin("start")
    .where(event -> event.getType().equals("login"))
    .followedBy("optional")
    .where(event -> event.getType().equals("view"))
    .optional()
    .followedBy("end")
    .where(event -> event.getType().equals("logout"));

Negation Patterns

// Not followed by
Pattern<Event, ?> notPattern = Pattern.<Event>begin("start")
    .where(event -> event.getType().equals("login"))
    .notFollowedBy("fraud")
    .where(event -> event.getType().equals("suspicious"))
    .followedBy("end")
    .where(event -> event.getType().equals("purchase"));

// Not next
Pattern<Event, ?> notNextPattern = Pattern.<Event>begin("start")
    .where(event -> event.getType().equals("start"))
    .notNext()
    .where(event -> event.getType().equals("error"));

Time Constraints

// Pattern must complete within time window
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("start")
    .where(event -> event.getType().equals("login"))
    .followedBy("purchase")
    .where(event -> event.getType().equals("purchase"))
    .within(Time.minutes(30));

Pattern Application

CEP Pattern Stream

class CEP {
    static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern);
    static <T> PatternStream<T> pattern(KeyedStream<T, ?> input, Pattern<T, ?> pattern);
}

interface PatternStream<T> {
    <R> SingleOutputStreamOperator<R> select(PatternSelectFunction<T, R> patternSelectFunction);
    <R> SingleOutputStreamOperator<R> process(PatternProcessFunction<T, R> patternProcessFunction);
    <L, R> SingleOutputStreamOperator<R> select(OutputTag<L> timedOutPartialMatchesTag, 
        PatternTimeoutFunction<T, L> patternTimeoutFunction, 
        PatternSelectFunction<T, R> patternSelectFunction);
    DataStream<T> inContext(String contextPattern);
}

Usage:

DataStream<Event> eventStream = env.addSource(new EventSource());

Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
    .where(event -> event.getType().equals("login"))
    .followedBy("purchase")
    .where(event -> event.getType().equals("purchase"))
    .within(Time.minutes(10));

PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);

Pattern Selection

Pattern Select Function

interface PatternSelectFunction<IN, OUT> extends Function {
    OUT select(Map<String, List<IN>> pattern) throws Exception;
}

Usage:

DataStream<Alert> alerts = patternStream.select(
    new PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, List<Event>> pattern) {
            Event loginEvent = pattern.get("login").get(0);
            Event purchaseEvent = pattern.get("purchase").get(0);
            
            return new Alert(
                loginEvent.getUserId(),
                "Quick purchase after login",
                loginEvent.getTimestamp(),
                purchaseEvent.getTimestamp()
            );
        }
    }
);

Pattern Process Function

abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {
    abstract void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
    
    interface Context {
        long timestamp();
        <X> void output(OutputTag<X> outputTag, X value);
    }
}

Usage:

DataStream<Result> results = patternStream.process(
    new PatternProcessFunction<Event, Result>() {
        @Override
        public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Result> out) {
            List<Event> loginEvents = match.get("login");
            List<Event> purchaseEvents = match.get("purchase");
            
            // Process all combinations
            for (Event login : loginEvents) {
                for (Event purchase : purchaseEvents) {
                    out.collect(new Result(login, purchase, ctx.timestamp()));
                }
            }
        }
    }
);

Timeout Handling

interface PatternTimeoutFunction<IN, OUT> extends Function {
    OUT timeout(Map<String, List<IN>> pattern, long timeoutTimestamp) throws Exception;
}

Usage:

OutputTag<TimeoutAlert> timeoutTag = new OutputTag<TimeoutAlert>("timeout"){};

SingleOutputStreamOperator<Alert> result = patternStream.select(
    timeoutTag,
    new PatternTimeoutFunction<Event, TimeoutAlert>() {
        @Override
        public TimeoutAlert timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) {
            Event loginEvent = pattern.get("login").get(0);
            return new TimeoutAlert(loginEvent.getUserId(), "No purchase after login", timeoutTimestamp);
        }
    },
    new PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, List<Event>> pattern) {
            // Regular match processing
            return new Alert(...);
        }
    }
);

DataStream<TimeoutAlert> timeouts = result.getSideOutput(timeoutTag);

After Match Skip Strategies

class AfterMatchSkipStrategy {
    static AfterMatchSkipStrategy noSkip();
    static AfterMatchSkipStrategy skipPastLastEvent();
    static AfterMatchSkipStrategy skipToFirst(String patternName);
    static AfterMatchSkipStrategy skipToLast(String patternName);
}

Usage:

// Skip to the first event of "middle" pattern after a match
Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipToFirst("middle"))
    .where(event -> event.getType().equals("A"))
    .followedBy("middle")
    .where(event -> event.getType().equals("B"))
    .followedBy("end")
    .where(event -> event.getType().equals("C"));

Complex Pattern Examples

Fraud Detection

// Detect multiple failed login attempts followed by success
Pattern<LoginEvent, ?> fraudPattern = Pattern.<LoginEvent>begin("failed")
    .where(event -> !event.isSuccessful())
    .times(3).consecutive()
    .followedBy("success")
    .where(event -> event.isSuccessful())
    .within(Time.minutes(5));

patternStream.select(new PatternSelectFunction<LoginEvent, FraudAlert>() {
    @Override
    public FraudAlert select(Map<String, List<LoginEvent>> pattern) {
        List<LoginEvent> failures = pattern.get("failed");
        LoginEvent success = pattern.get("success").get(0);
        
        return new FraudAlert(
            success.getUserId(),
            failures.size(),
            failures.get(0).getTimestamp(),
            success.getTimestamp()
        );
    }
});

User Journey Analysis

// Track user journey: view -> add_to_cart -> (optional) remove -> purchase
Pattern<UserEvent, ?> journeyPattern = Pattern.<UserEvent>begin("view")
    .where(event -> event.getAction().equals("view"))
    .followedBy("cart")
    .where(event -> event.getAction().equals("add_to_cart"))
    .followedBy("remove")
    .where(event -> event.getAction().equals("remove"))
    .optional()
    .followedBy("purchase")
    .where(event -> event.getAction().equals("purchase"))
    .within(Time.hours(24));

Types

interface PatternSelectFunction<IN, OUT> extends Function, Serializable;
interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable;
abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction;

class Time {
    static Time milliseconds(long milliseconds);
    static Time seconds(long seconds);
    static Time minutes(long minutes);
    static Time hours(long hours);
    static Time days(long days);
}

enum Quantifier {
    ONE,
    ONE_OR_MORE,
    TIMES,
    LOOPING
}

interface Function extends Serializable;
abstract class RichFunction extends AbstractRichFunction implements Function;