This document covers pattern matching and complex event detection capabilities on streaming data using Apache Flink's CEP library bundled in the Table Uber Blink package.
class Pattern<T, F extends T> {
static <X> Pattern<X, X> begin(String name);
static <X> Pattern<X, X> begin(String name, AfterMatchSkipStrategy afterMatchSkipStrategy);
Pattern<T, F> where(SimpleCondition<F> condition);
Pattern<T, F> where(IterativeCondition<F> condition);
Pattern<T, F> or(SimpleCondition<F> condition);
Pattern<T, F> or(IterativeCondition<F> condition);
Pattern<T, F> next(String name);
Pattern<T, F> followedBy(String name);
Pattern<T, F> followedByAny(String name);
Pattern<T, F> notNext();
Pattern<T, F> notFollowedBy();
Pattern<T, F> within(Time within);
Pattern<T, F> times(int times);
Pattern<T, F> times(int fromTimes, int toTimes);
Pattern<T, F> oneOrMore();
Pattern<T, F> timesOrMore(int times);
Pattern<T, F> optional();
Pattern<T, F> greedy();
}Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login");
}
})
.next("middle")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("purchase");
}
})
.within(Time.minutes(10));abstract class SimpleCondition<T> implements Function {
abstract boolean filter(T value) throws Exception;
}Usage:
// Simple condition for event type
SimpleCondition<Event> loginCondition = new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return "login".equals(event.getType());
}
};
// Using lambda
SimpleCondition<Event> highValueCondition = event -> event.getAmount() > 1000;abstract class IterativeCondition<T> extends RichFunction {
abstract boolean filter(T value, Context<T> ctx) throws Exception;
interface Context<T> {
Iterable<T> getEventsForPattern(String name);
<X> Iterable<X> getEventsForPattern(String name, Class<X> clazz);
long timestamp();
}
}Usage:
// Iterative condition accessing previous events
IterativeCondition<Event> increasingAmountCondition = new IterativeCondition<Event>() {
@Override
public boolean filter(Event current, Context<Event> ctx) throws Exception {
if (!current.getType().equals("purchase")) {
return false;
}
for (Event prev : ctx.getEventsForPattern("previous")) {
if (current.getAmount() <= prev.getAmount()) {
return false;
}
}
return true;
}
};// Events must occur immediately one after another
Pattern<Event, ?> strictPattern = Pattern.<Event>begin("first")
.where(event -> event.getType().equals("A"))
.next("second")
.where(event -> event.getType().equals("B"));// Events can have other events in between
Pattern<Event, ?> relaxedPattern = Pattern.<Event>begin("first")
.where(event -> event.getType().equals("A"))
.followedBy("second")
.where(event -> event.getType().equals("B"));// Multiple matches possible for the same event
Pattern<Event, ?> nonDetPattern = Pattern.<Event>begin("first")
.where(event -> event.getType().equals("A"))
.followedByAny("second")
.where(event -> event.getType().equals("B"));// Exactly 3 times
Pattern<Event, ?> exactPattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.times(3);
// Between 2 and 4 times
Pattern<Event, ?> rangePattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.times(2, 4);// One or more occurrences
Pattern<Event, ?> oneOrMorePattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.oneOrMore();
// At least 2 occurrences
Pattern<Event, ?> timesOrMorePattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.timesOrMore(2);// Optional event
Pattern<Event, ?> optionalPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("login"))
.followedBy("optional")
.where(event -> event.getType().equals("view"))
.optional()
.followedBy("end")
.where(event -> event.getType().equals("logout"));// Not followed by
Pattern<Event, ?> notPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("login"))
.notFollowedBy("fraud")
.where(event -> event.getType().equals("suspicious"))
.followedBy("end")
.where(event -> event.getType().equals("purchase"));
// Not next
Pattern<Event, ?> notNextPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("start"))
.notNext()
.where(event -> event.getType().equals("error"));// Pattern must complete within time window
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("login"))
.followedBy("purchase")
.where(event -> event.getType().equals("purchase"))
.within(Time.minutes(30));class CEP {
static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern);
static <T> PatternStream<T> pattern(KeyedStream<T, ?> input, Pattern<T, ?> pattern);
}
interface PatternStream<T> {
<R> SingleOutputStreamOperator<R> select(PatternSelectFunction<T, R> patternSelectFunction);
<R> SingleOutputStreamOperator<R> process(PatternProcessFunction<T, R> patternProcessFunction);
<L, R> SingleOutputStreamOperator<R> select(OutputTag<L> timedOutPartialMatchesTag,
PatternTimeoutFunction<T, L> patternTimeoutFunction,
PatternSelectFunction<T, R> patternSelectFunction);
DataStream<T> inContext(String contextPattern);
}Usage:
DataStream<Event> eventStream = env.addSource(new EventSource());
Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
.where(event -> event.getType().equals("login"))
.followedBy("purchase")
.where(event -> event.getType().equals("purchase"))
.within(Time.minutes(10));
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);interface PatternSelectFunction<IN, OUT> extends Function {
OUT select(Map<String, List<IN>> pattern) throws Exception;
}Usage:
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
Event loginEvent = pattern.get("login").get(0);
Event purchaseEvent = pattern.get("purchase").get(0);
return new Alert(
loginEvent.getUserId(),
"Quick purchase after login",
loginEvent.getTimestamp(),
purchaseEvent.getTimestamp()
);
}
}
);abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {
abstract void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
interface Context {
long timestamp();
<X> void output(OutputTag<X> outputTag, X value);
}
}Usage:
DataStream<Result> results = patternStream.process(
new PatternProcessFunction<Event, Result>() {
@Override
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Result> out) {
List<Event> loginEvents = match.get("login");
List<Event> purchaseEvents = match.get("purchase");
// Process all combinations
for (Event login : loginEvents) {
for (Event purchase : purchaseEvents) {
out.collect(new Result(login, purchase, ctx.timestamp()));
}
}
}
}
);interface PatternTimeoutFunction<IN, OUT> extends Function {
OUT timeout(Map<String, List<IN>> pattern, long timeoutTimestamp) throws Exception;
}Usage:
OutputTag<TimeoutAlert> timeoutTag = new OutputTag<TimeoutAlert>("timeout"){};
SingleOutputStreamOperator<Alert> result = patternStream.select(
timeoutTag,
new PatternTimeoutFunction<Event, TimeoutAlert>() {
@Override
public TimeoutAlert timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) {
Event loginEvent = pattern.get("login").get(0);
return new TimeoutAlert(loginEvent.getUserId(), "No purchase after login", timeoutTimestamp);
}
},
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
// Regular match processing
return new Alert(...);
}
}
);
DataStream<TimeoutAlert> timeouts = result.getSideOutput(timeoutTag);class AfterMatchSkipStrategy {
static AfterMatchSkipStrategy noSkip();
static AfterMatchSkipStrategy skipPastLastEvent();
static AfterMatchSkipStrategy skipToFirst(String patternName);
static AfterMatchSkipStrategy skipToLast(String patternName);
}Usage:
// Skip to the first event of "middle" pattern after a match
Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipToFirst("middle"))
.where(event -> event.getType().equals("A"))
.followedBy("middle")
.where(event -> event.getType().equals("B"))
.followedBy("end")
.where(event -> event.getType().equals("C"));// Detect multiple failed login attempts followed by success
Pattern<LoginEvent, ?> fraudPattern = Pattern.<LoginEvent>begin("failed")
.where(event -> !event.isSuccessful())
.times(3).consecutive()
.followedBy("success")
.where(event -> event.isSuccessful())
.within(Time.minutes(5));
patternStream.select(new PatternSelectFunction<LoginEvent, FraudAlert>() {
@Override
public FraudAlert select(Map<String, List<LoginEvent>> pattern) {
List<LoginEvent> failures = pattern.get("failed");
LoginEvent success = pattern.get("success").get(0);
return new FraudAlert(
success.getUserId(),
failures.size(),
failures.get(0).getTimestamp(),
success.getTimestamp()
);
}
});// Track user journey: view -> add_to_cart -> (optional) remove -> purchase
Pattern<UserEvent, ?> journeyPattern = Pattern.<UserEvent>begin("view")
.where(event -> event.getAction().equals("view"))
.followedBy("cart")
.where(event -> event.getAction().equals("add_to_cart"))
.followedBy("remove")
.where(event -> event.getAction().equals("remove"))
.optional()
.followedBy("purchase")
.where(event -> event.getAction().equals("purchase"))
.within(Time.hours(24));interface PatternSelectFunction<IN, OUT> extends Function, Serializable;
interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable;
abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction;
class Time {
static Time milliseconds(long milliseconds);
static Time seconds(long seconds);
static Time minutes(long minutes);
static Time hours(long hours);
static Time days(long days);
}
enum Quantifier {
ONE,
ONE_OR_MORE,
TIMES,
LOOPING
}
interface Function extends Serializable;
abstract class RichFunction extends AbstractRichFunction implements Function;