0
# Complex Event Processing (CEP)
1
2
This document covers pattern matching and complex event detection capabilities on streaming data using Apache Flink's CEP library bundled in the Table Uber Blink package.
3
4
## Basic Pattern Definition
5
6
### Pattern Creation
7
8
```java { .api }
9
class Pattern<T, F extends T> {
10
static <X> Pattern<X, X> begin(String name);
11
static <X> Pattern<X, X> begin(String name, AfterMatchSkipStrategy afterMatchSkipStrategy);
12
13
Pattern<T, F> where(SimpleCondition<F> condition);
14
Pattern<T, F> where(IterativeCondition<F> condition);
15
Pattern<T, F> or(SimpleCondition<F> condition);
16
Pattern<T, F> or(IterativeCondition<F> condition);
17
18
Pattern<T, F> next(String name);
19
Pattern<T, F> followedBy(String name);
20
Pattern<T, F> followedByAny(String name);
21
Pattern<T, F> notNext();
22
Pattern<T, F> notFollowedBy();
23
24
Pattern<T, F> within(Time within);
25
Pattern<T, F> times(int times);
26
Pattern<T, F> times(int fromTimes, int toTimes);
27
Pattern<T, F> oneOrMore();
28
Pattern<T, F> timesOrMore(int times);
29
Pattern<T, F> optional();
30
Pattern<T, F> greedy();
31
}
32
```
33
34
### Simple Pattern Example
35
36
```java
37
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
38
.where(new SimpleCondition<Event>() {
39
@Override
40
public boolean filter(Event event) {
41
return event.getType().equals("login");
42
}
43
})
44
.next("middle")
45
.where(new SimpleCondition<Event>() {
46
@Override
47
public boolean filter(Event event) {
48
return event.getType().equals("purchase");
49
}
50
})
51
.within(Time.minutes(10));
52
```
53
54
## Pattern Conditions
55
56
### Simple Conditions
57
58
```java { .api }
59
abstract class SimpleCondition<T> implements Function {
60
abstract boolean filter(T value) throws Exception;
61
}
62
```
63
64
**Usage:**
65
66
```java
67
// Simple condition for event type
68
SimpleCondition<Event> loginCondition = new SimpleCondition<Event>() {
69
@Override
70
public boolean filter(Event event) {
71
return "login".equals(event.getType());
72
}
73
};
74
75
// Using lambda
76
SimpleCondition<Event> highValueCondition = event -> event.getAmount() > 1000;
77
```
78
79
### Iterative Conditions
80
81
```java { .api }
82
abstract class IterativeCondition<T> extends RichFunction {
83
abstract boolean filter(T value, Context<T> ctx) throws Exception;
84
85
interface Context<T> {
86
Iterable<T> getEventsForPattern(String name);
87
<X> Iterable<X> getEventsForPattern(String name, Class<X> clazz);
88
long timestamp();
89
}
90
}
91
```
92
93
**Usage:**
94
95
```java
96
// Iterative condition accessing previous events
97
IterativeCondition<Event> increasingAmountCondition = new IterativeCondition<Event>() {
98
@Override
99
public boolean filter(Event current, Context<Event> ctx) throws Exception {
100
if (!current.getType().equals("purchase")) {
101
return false;
102
}
103
104
for (Event prev : ctx.getEventsForPattern("previous")) {
105
if (current.getAmount() <= prev.getAmount()) {
106
return false;
107
}
108
}
109
return true;
110
}
111
};
112
```
113
114
## Pattern Sequence Types
115
116
### Strict Contiguity (next)
117
118
```java
119
// Events must occur immediately one after another
120
Pattern<Event, ?> strictPattern = Pattern.<Event>begin("first")
121
.where(event -> event.getType().equals("A"))
122
.next("second")
123
.where(event -> event.getType().equals("B"));
124
```
125
126
### Relaxed Contiguity (followedBy)
127
128
```java
129
// Events can have other events in between
130
Pattern<Event, ?> relaxedPattern = Pattern.<Event>begin("first")
131
.where(event -> event.getType().equals("A"))
132
.followedBy("second")
133
.where(event -> event.getType().equals("B"));
134
```
135
136
### Non-Deterministic Relaxed Contiguity (followedByAny)
137
138
```java
139
// Multiple matches possible for the same event
140
Pattern<Event, ?> nonDetPattern = Pattern.<Event>begin("first")
141
.where(event -> event.getType().equals("A"))
142
.followedByAny("second")
143
.where(event -> event.getType().equals("B"));
144
```
145
146
## Quantifiers
147
148
### Times
149
150
```java
151
// Exactly 3 times
152
Pattern<Event, ?> exactPattern = Pattern.<Event>begin("events")
153
.where(event -> event.getType().equals("click"))
154
.times(3);
155
156
// Between 2 and 4 times
157
Pattern<Event, ?> rangePattern = Pattern.<Event>begin("events")
158
.where(event -> event.getType().equals("click"))
159
.times(2, 4);
160
```
161
162
### One or More
163
164
```java
165
// One or more occurrences
166
Pattern<Event, ?> oneOrMorePattern = Pattern.<Event>begin("events")
167
.where(event -> event.getType().equals("click"))
168
.oneOrMore();
169
170
// At least 2 occurrences
171
Pattern<Event, ?> timesOrMorePattern = Pattern.<Event>begin("events")
172
.where(event -> event.getType().equals("click"))
173
.timesOrMore(2);
174
```
175
176
### Optional
177
178
```java
179
// Optional event
180
Pattern<Event, ?> optionalPattern = Pattern.<Event>begin("start")
181
.where(event -> event.getType().equals("login"))
182
.followedBy("optional")
183
.where(event -> event.getType().equals("view"))
184
.optional()
185
.followedBy("end")
186
.where(event -> event.getType().equals("logout"));
187
```
188
189
## Negation Patterns
190
191
```java
192
// Not followed by
193
Pattern<Event, ?> notPattern = Pattern.<Event>begin("start")
194
.where(event -> event.getType().equals("login"))
195
.notFollowedBy("fraud")
196
.where(event -> event.getType().equals("suspicious"))
197
.followedBy("end")
198
.where(event -> event.getType().equals("purchase"));
199
200
// Not next
201
Pattern<Event, ?> notNextPattern = Pattern.<Event>begin("start")
202
.where(event -> event.getType().equals("start"))
203
.notNext()
204
.where(event -> event.getType().equals("error"));
205
```
206
207
## Time Constraints
208
209
```java
210
// Pattern must complete within time window
211
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("start")
212
.where(event -> event.getType().equals("login"))
213
.followedBy("purchase")
214
.where(event -> event.getType().equals("purchase"))
215
.within(Time.minutes(30));
216
```
217
218
## Pattern Application
219
220
### CEP Pattern Stream
221
222
```java { .api }
223
class CEP {
224
static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern);
225
static <T> PatternStream<T> pattern(KeyedStream<T, ?> input, Pattern<T, ?> pattern);
226
}
227
228
interface PatternStream<T> {
229
<R> SingleOutputStreamOperator<R> select(PatternSelectFunction<T, R> patternSelectFunction);
230
<R> SingleOutputStreamOperator<R> process(PatternProcessFunction<T, R> patternProcessFunction);
231
<L, R> SingleOutputStreamOperator<R> select(OutputTag<L> timedOutPartialMatchesTag,
232
PatternTimeoutFunction<T, L> patternTimeoutFunction,
233
PatternSelectFunction<T, R> patternSelectFunction);
234
DataStream<T> inContext(String contextPattern);
235
}
236
```
237
238
**Usage:**
239
240
```java
241
DataStream<Event> eventStream = env.addSource(new EventSource());
242
243
Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
244
.where(event -> event.getType().equals("login"))
245
.followedBy("purchase")
246
.where(event -> event.getType().equals("purchase"))
247
.within(Time.minutes(10));
248
249
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
250
```
251
252
## Pattern Selection
253
254
### Pattern Select Function
255
256
```java { .api }
257
interface PatternSelectFunction<IN, OUT> extends Function {
258
OUT select(Map<String, List<IN>> pattern) throws Exception;
259
}
260
```
261
262
**Usage:**
263
264
```java
265
DataStream<Alert> alerts = patternStream.select(
266
new PatternSelectFunction<Event, Alert>() {
267
@Override
268
public Alert select(Map<String, List<Event>> pattern) {
269
Event loginEvent = pattern.get("login").get(0);
270
Event purchaseEvent = pattern.get("purchase").get(0);
271
272
return new Alert(
273
loginEvent.getUserId(),
274
"Quick purchase after login",
275
loginEvent.getTimestamp(),
276
purchaseEvent.getTimestamp()
277
);
278
}
279
}
280
);
281
```
282
283
### Pattern Process Function
284
285
```java { .api }
286
abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {
287
abstract void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
288
289
interface Context {
290
long timestamp();
291
<X> void output(OutputTag<X> outputTag, X value);
292
}
293
}
294
```
295
296
**Usage:**
297
298
```java
299
DataStream<Result> results = patternStream.process(
300
new PatternProcessFunction<Event, Result>() {
301
@Override
302
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Result> out) {
303
List<Event> loginEvents = match.get("login");
304
List<Event> purchaseEvents = match.get("purchase");
305
306
// Process all combinations
307
for (Event login : loginEvents) {
308
for (Event purchase : purchaseEvents) {
309
out.collect(new Result(login, purchase, ctx.timestamp()));
310
}
311
}
312
}
313
}
314
);
315
```
316
317
## Timeout Handling
318
319
```java { .api }
320
interface PatternTimeoutFunction<IN, OUT> extends Function {
321
OUT timeout(Map<String, List<IN>> pattern, long timeoutTimestamp) throws Exception;
322
}
323
```
324
325
**Usage:**
326
327
```java
328
OutputTag<TimeoutAlert> timeoutTag = new OutputTag<TimeoutAlert>("timeout"){};
329
330
SingleOutputStreamOperator<Alert> result = patternStream.select(
331
timeoutTag,
332
new PatternTimeoutFunction<Event, TimeoutAlert>() {
333
@Override
334
public TimeoutAlert timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) {
335
Event loginEvent = pattern.get("login").get(0);
336
return new TimeoutAlert(loginEvent.getUserId(), "No purchase after login", timeoutTimestamp);
337
}
338
},
339
new PatternSelectFunction<Event, Alert>() {
340
@Override
341
public Alert select(Map<String, List<Event>> pattern) {
342
// Regular match processing
343
return new Alert(...);
344
}
345
}
346
);
347
348
DataStream<TimeoutAlert> timeouts = result.getSideOutput(timeoutTag);
349
```
350
351
## After Match Skip Strategies
352
353
```java { .api }
354
class AfterMatchSkipStrategy {
355
static AfterMatchSkipStrategy noSkip();
356
static AfterMatchSkipStrategy skipPastLastEvent();
357
static AfterMatchSkipStrategy skipToFirst(String patternName);
358
static AfterMatchSkipStrategy skipToLast(String patternName);
359
}
360
```
361
362
**Usage:**
363
364
```java
365
// Skip to the first event of "middle" pattern after a match
366
Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipToFirst("middle"))
367
.where(event -> event.getType().equals("A"))
368
.followedBy("middle")
369
.where(event -> event.getType().equals("B"))
370
.followedBy("end")
371
.where(event -> event.getType().equals("C"));
372
```
373
374
## Complex Pattern Examples
375
376
### Fraud Detection
377
378
```java
379
// Detect multiple failed login attempts followed by success
380
Pattern<LoginEvent, ?> fraudPattern = Pattern.<LoginEvent>begin("failed")
381
.where(event -> !event.isSuccessful())
382
.times(3).consecutive()
383
.followedBy("success")
384
.where(event -> event.isSuccessful())
385
.within(Time.minutes(5));
386
387
patternStream.select(new PatternSelectFunction<LoginEvent, FraudAlert>() {
388
@Override
389
public FraudAlert select(Map<String, List<LoginEvent>> pattern) {
390
List<LoginEvent> failures = pattern.get("failed");
391
LoginEvent success = pattern.get("success").get(0);
392
393
return new FraudAlert(
394
success.getUserId(),
395
failures.size(),
396
failures.get(0).getTimestamp(),
397
success.getTimestamp()
398
);
399
}
400
});
401
```
402
403
### User Journey Analysis
404
405
```java
406
// Track user journey: view -> add_to_cart -> (optional) remove -> purchase
407
Pattern<UserEvent, ?> journeyPattern = Pattern.<UserEvent>begin("view")
408
.where(event -> event.getAction().equals("view"))
409
.followedBy("cart")
410
.where(event -> event.getAction().equals("add_to_cart"))
411
.followedBy("remove")
412
.where(event -> event.getAction().equals("remove"))
413
.optional()
414
.followedBy("purchase")
415
.where(event -> event.getAction().equals("purchase"))
416
.within(Time.hours(24));
417
```
418
419
## Types
420
421
```java { .api }
422
interface PatternSelectFunction<IN, OUT> extends Function, Serializable;
423
interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable;
424
abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction;
425
426
class Time {
427
static Time milliseconds(long milliseconds);
428
static Time seconds(long seconds);
429
static Time minutes(long minutes);
430
static Time hours(long hours);
431
static Time days(long days);
432
}
433
434
enum Quantifier {
435
ONE,
436
ONE_OR_MORE,
437
TIMES,
438
LOOPING
439
}
440
441
interface Function extends Serializable;
442
abstract class RichFunction extends AbstractRichFunction implements Function;
443
```