Comprehensive Table/SQL distribution for Apache Flink with Blink planner for optimized table processing in both batch and streaming modes.
This document covers pattern matching and complex event detection capabilities on streaming data using Apache Flink's CEP library bundled in the Table Uber Blink package.
class Pattern<T, F extends T> {
static <X> Pattern<X, X> begin(String name);
static <X> Pattern<X, X> begin(String name, AfterMatchSkipStrategy afterMatchSkipStrategy);
Pattern<T, F> where(SimpleCondition<F> condition);
Pattern<T, F> where(IterativeCondition<F> condition);
Pattern<T, F> or(SimpleCondition<F> condition);
Pattern<T, F> or(IterativeCondition<F> condition);
Pattern<T, F> next(String name);
Pattern<T, F> followedBy(String name);
Pattern<T, F> followedByAny(String name);
Pattern<T, F> notNext();
Pattern<T, F> notFollowedBy();
Pattern<T, F> within(Time within);
Pattern<T, F> times(int times);
Pattern<T, F> times(int fromTimes, int toTimes);
Pattern<T, F> oneOrMore();
Pattern<T, F> timesOrMore(int times);
Pattern<T, F> optional();
Pattern<T, F> greedy();
}Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("login");
}
})
.next("middle")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("purchase");
}
})
.within(Time.minutes(10));abstract class SimpleCondition<T> implements Function {
abstract boolean filter(T value) throws Exception;
}Usage:
// Simple condition for event type
SimpleCondition<Event> loginCondition = new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return "login".equals(event.getType());
}
};
// Using lambda
SimpleCondition<Event> highValueCondition = event -> event.getAmount() > 1000;abstract class IterativeCondition<T> extends RichFunction {
abstract boolean filter(T value, Context<T> ctx) throws Exception;
interface Context<T> {
Iterable<T> getEventsForPattern(String name);
<X> Iterable<X> getEventsForPattern(String name, Class<X> clazz);
long timestamp();
}
}Usage:
// Iterative condition accessing previous events
IterativeCondition<Event> increasingAmountCondition = new IterativeCondition<Event>() {
@Override
public boolean filter(Event current, Context<Event> ctx) throws Exception {
if (!current.getType().equals("purchase")) {
return false;
}
for (Event prev : ctx.getEventsForPattern("previous")) {
if (current.getAmount() <= prev.getAmount()) {
return false;
}
}
return true;
}
};// Events must occur immediately one after another
Pattern<Event, ?> strictPattern = Pattern.<Event>begin("first")
.where(event -> event.getType().equals("A"))
.next("second")
.where(event -> event.getType().equals("B"));// Events can have other events in between
Pattern<Event, ?> relaxedPattern = Pattern.<Event>begin("first")
.where(event -> event.getType().equals("A"))
.followedBy("second")
.where(event -> event.getType().equals("B"));// Multiple matches possible for the same event
Pattern<Event, ?> nonDetPattern = Pattern.<Event>begin("first")
.where(event -> event.getType().equals("A"))
.followedByAny("second")
.where(event -> event.getType().equals("B"));// Exactly 3 times
Pattern<Event, ?> exactPattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.times(3);
// Between 2 and 4 times
Pattern<Event, ?> rangePattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.times(2, 4);// One or more occurrences
Pattern<Event, ?> oneOrMorePattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.oneOrMore();
// At least 2 occurrences
Pattern<Event, ?> timesOrMorePattern = Pattern.<Event>begin("events")
.where(event -> event.getType().equals("click"))
.timesOrMore(2);// Optional event
Pattern<Event, ?> optionalPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("login"))
.followedBy("optional")
.where(event -> event.getType().equals("view"))
.optional()
.followedBy("end")
.where(event -> event.getType().equals("logout"));// Not followed by
Pattern<Event, ?> notPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("login"))
.notFollowedBy("fraud")
.where(event -> event.getType().equals("suspicious"))
.followedBy("end")
.where(event -> event.getType().equals("purchase"));
// Not next
Pattern<Event, ?> notNextPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("start"))
.notNext()
.where(event -> event.getType().equals("error"));// Pattern must complete within time window
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("start")
.where(event -> event.getType().equals("login"))
.followedBy("purchase")
.where(event -> event.getType().equals("purchase"))
.within(Time.minutes(30));class CEP {
static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern);
static <T> PatternStream<T> pattern(KeyedStream<T, ?> input, Pattern<T, ?> pattern);
}
interface PatternStream<T> {
<R> SingleOutputStreamOperator<R> select(PatternSelectFunction<T, R> patternSelectFunction);
<R> SingleOutputStreamOperator<R> process(PatternProcessFunction<T, R> patternProcessFunction);
<L, R> SingleOutputStreamOperator<R> select(OutputTag<L> timedOutPartialMatchesTag,
PatternTimeoutFunction<T, L> patternTimeoutFunction,
PatternSelectFunction<T, R> patternSelectFunction);
DataStream<T> inContext(String contextPattern);
}Usage:
DataStream<Event> eventStream = env.addSource(new EventSource());
Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
.where(event -> event.getType().equals("login"))
.followedBy("purchase")
.where(event -> event.getType().equals("purchase"))
.within(Time.minutes(10));
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);interface PatternSelectFunction<IN, OUT> extends Function {
OUT select(Map<String, List<IN>> pattern) throws Exception;
}Usage:
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
Event loginEvent = pattern.get("login").get(0);
Event purchaseEvent = pattern.get("purchase").get(0);
return new Alert(
loginEvent.getUserId(),
"Quick purchase after login",
loginEvent.getTimestamp(),
purchaseEvent.getTimestamp()
);
}
}
);abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {
abstract void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
interface Context {
long timestamp();
<X> void output(OutputTag<X> outputTag, X value);
}
}Usage:
DataStream<Result> results = patternStream.process(
new PatternProcessFunction<Event, Result>() {
@Override
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Result> out) {
List<Event> loginEvents = match.get("login");
List<Event> purchaseEvents = match.get("purchase");
// Process all combinations
for (Event login : loginEvents) {
for (Event purchase : purchaseEvents) {
out.collect(new Result(login, purchase, ctx.timestamp()));
}
}
}
}
);interface PatternTimeoutFunction<IN, OUT> extends Function {
OUT timeout(Map<String, List<IN>> pattern, long timeoutTimestamp) throws Exception;
}Usage:
OutputTag<TimeoutAlert> timeoutTag = new OutputTag<TimeoutAlert>("timeout"){};
SingleOutputStreamOperator<Alert> result = patternStream.select(
timeoutTag,
new PatternTimeoutFunction<Event, TimeoutAlert>() {
@Override
public TimeoutAlert timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) {
Event loginEvent = pattern.get("login").get(0);
return new TimeoutAlert(loginEvent.getUserId(), "No purchase after login", timeoutTimestamp);
}
},
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
// Regular match processing
return new Alert(...);
}
}
);
DataStream<TimeoutAlert> timeouts = result.getSideOutput(timeoutTag);class AfterMatchSkipStrategy {
static AfterMatchSkipStrategy noSkip();
static AfterMatchSkipStrategy skipPastLastEvent();
static AfterMatchSkipStrategy skipToFirst(String patternName);
static AfterMatchSkipStrategy skipToLast(String patternName);
}Usage:
// Skip to the first event of "middle" pattern after a match
Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipToFirst("middle"))
.where(event -> event.getType().equals("A"))
.followedBy("middle")
.where(event -> event.getType().equals("B"))
.followedBy("end")
.where(event -> event.getType().equals("C"));// Detect multiple failed login attempts followed by success
Pattern<LoginEvent, ?> fraudPattern = Pattern.<LoginEvent>begin("failed")
.where(event -> !event.isSuccessful())
.times(3).consecutive()
.followedBy("success")
.where(event -> event.isSuccessful())
.within(Time.minutes(5));
patternStream.select(new PatternSelectFunction<LoginEvent, FraudAlert>() {
@Override
public FraudAlert select(Map<String, List<LoginEvent>> pattern) {
List<LoginEvent> failures = pattern.get("failed");
LoginEvent success = pattern.get("success").get(0);
return new FraudAlert(
success.getUserId(),
failures.size(),
failures.get(0).getTimestamp(),
success.getTimestamp()
);
}
});// Track user journey: view -> add_to_cart -> (optional) remove -> purchase
Pattern<UserEvent, ?> journeyPattern = Pattern.<UserEvent>begin("view")
.where(event -> event.getAction().equals("view"))
.followedBy("cart")
.where(event -> event.getAction().equals("add_to_cart"))
.followedBy("remove")
.where(event -> event.getAction().equals("remove"))
.optional()
.followedBy("purchase")
.where(event -> event.getAction().equals("purchase"))
.within(Time.hours(24));interface PatternSelectFunction<IN, OUT> extends Function, Serializable;
interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable;
abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction;
class Time {
static Time milliseconds(long milliseconds);
static Time seconds(long seconds);
static Time minutes(long minutes);
static Time hours(long hours);
static Time days(long days);
}
enum Quantifier {
ONE,
ONE_OR_MORE,
TIMES,
LOOPING
}
interface Function extends Serializable;
abstract class RichFunction extends AbstractRichFunction implements Function;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-uber-blink-2-12