0
# Pattern Definition
1
2
Complex event pattern definition using the fluent Scala DSL with temporal constraints, conditions, and quantifiers.
3
4
## Capabilities
5
6
### Pattern Creation
7
8
Start a new pattern sequence with a named initial pattern.
9
10
```scala { .api }
11
/**
12
* Start a new pattern sequence with the given name
13
* @param name The name of starting pattern
14
* @tparam X Base type of events in the pattern
15
* @return The first pattern of a pattern sequence
16
*/
17
object Pattern {
18
def begin[X](name: String): Pattern[X, X]
19
def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]
20
}
21
```
22
23
**Usage Examples:**
24
25
```scala
26
import org.apache.flink.cep.scala.pattern.Pattern
27
28
case class LoginEvent(userId: String, timestamp: Long)
29
30
// Simple pattern start
31
val pattern = Pattern.begin[LoginEvent]("login")
32
.where(_.userId.nonEmpty)
33
34
// Pattern with skip strategy
35
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
36
val skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent()
37
val pattern = Pattern.begin[LoginEvent]("login", skipStrategy)
38
```
39
40
### Condition Definition
41
42
Add conditions that events must satisfy to be considered matches.
43
44
```scala { .api }
45
class Pattern[T, F <: T] {
46
/**
47
* Add AND condition using simple predicate function
48
* @param condition Predicate function for the event
49
* @return Pattern with the new condition
50
*/
51
def where(condition: F => Boolean): Pattern[T, F]
52
53
/**
54
* Add AND condition with access to context
55
* @param condition Function taking event and context
56
* @return Pattern with the new condition
57
*/
58
def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]
59
60
/**
61
* Add AND condition using IterativeCondition
62
* @param condition The IterativeCondition to apply
63
* @return Pattern with the new condition
64
*/
65
def where(condition: IterativeCondition[F]): Pattern[T, F]
66
67
/**
68
* Add OR condition using simple predicate function
69
* @param condition Predicate function for the event
70
* @return Pattern with the new condition
71
*/
72
def or(condition: F => Boolean): Pattern[T, F]
73
74
/**
75
* Add OR condition with access to context
76
* @param condition Function taking event and context
77
* @return Pattern with the new condition
78
*/
79
def or(condition: (F, Context[F]) => Boolean): Pattern[T, F]
80
81
/**
82
* Add OR condition using IterativeCondition
83
* @param condition The IterativeCondition to apply
84
* @return Pattern with the new condition
85
*/
86
def or(condition: IterativeCondition[F]): Pattern[T, F]
87
}
88
```
89
90
**Usage Examples:**
91
92
```scala
93
case class Event(eventType: String, value: Int, userId: String)
94
95
// Simple condition
96
val pattern = Pattern.begin[Event]("start")
97
.where(_.eventType == "login")
98
.or(_.eventType == "signup")
99
100
// Context-aware condition
101
val pattern = Pattern.begin[Event]("start")
102
.where(_.eventType == "purchase")
103
.next("confirmation")
104
.where((event, ctx) => {
105
val previousEvents = ctx.getEventsForPattern("start")
106
event.userId == previousEvents.head.userId
107
})
108
```
109
110
### Subtype Constraints
111
112
Apply subtype constraints to patterns for type-safe event matching.
113
114
```scala { .api }
115
class Pattern[T, F <: T] {
116
/**
117
* Apply subtype constraint requiring events to be of specific subtype
118
* @param clazz Class of the required subtype
119
* @tparam S The subtype
120
* @return Pattern constrained to the subtype
121
*/
122
def subtype[S <: F](clazz: Class[S]): Pattern[T, S]
123
}
124
```
125
126
**Usage Examples:**
127
128
```scala
129
abstract class Event(eventType: String)
130
case class LoginEvent(userId: String) extends Event("login")
131
case class PurchaseEvent(userId: String, amount: Double) extends Event("purchase")
132
133
val pattern = Pattern.begin[Event]("start")
134
.subtype(classOf[LoginEvent])
135
.next("purchase")
136
.subtype(classOf[PurchaseEvent])
137
```
138
139
### Temporal Pattern Chaining
140
141
Chain patterns with different temporal contiguity requirements.
142
143
```scala { .api }
144
class Pattern[T, F <: T] {
145
/**
146
* Strict temporal contiguity - no events between matches
147
* @param name Name of the next pattern
148
* @return New pattern enforcing strict contiguity
149
*/
150
def next(name: String): Pattern[T, T]
151
152
/**
153
* Non-strict temporal contiguity - events may be interleaved
154
* @param name Name of the following pattern
155
* @return New pattern allowing interleaved events
156
*/
157
def followedBy(name: String): Pattern[T, T]
158
159
/**
160
* Non-deterministic following - matches any occurrence
161
* @param name Name of the following pattern
162
* @return New pattern with non-deterministic following
163
*/
164
def followedByAny(name: String): Pattern[T, T]
165
166
/**
167
* Negative pattern - no matching event should follow
168
* @param name Name of the pattern that should not occur
169
* @return New pattern with negative constraint
170
*/
171
def notNext(name: String): Pattern[T, T]
172
173
/**
174
* Negative following pattern - no matching event should occur between
175
* @param name Name of the pattern that should not occur
176
* @return New pattern with negative constraint
177
*/
178
def notFollowedBy(name: String): Pattern[T, T]
179
}
180
```
181
182
### Group Pattern Chaining
183
184
Chain patterns with other Pattern objects to create GroupPatterns.
185
186
```scala { .api }
187
class Pattern[T, F <: T] {
188
/**
189
* Chain with another pattern using followedBy semantics
190
* @param pattern The pattern to follow this one
191
* @return GroupPattern for further chaining
192
*/
193
def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]
194
195
/**
196
* Chain with another pattern using followedByAny semantics
197
* @param pattern The pattern to follow this one
198
* @return GroupPattern for further chaining
199
*/
200
def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]
201
202
/**
203
* Chain with another pattern using next semantics
204
* @param pattern The pattern to follow this one strictly
205
* @return GroupPattern for further chaining
206
*/
207
def next(pattern: Pattern[T, F]): GroupPattern[T, F]
208
}
209
210
object Pattern {
211
/**
212
* Start pattern sequence with existing pattern
213
* @param pattern Initial pattern for the sequence
214
* @return GroupPattern for chaining
215
*/
216
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
217
218
/**
219
* Start pattern sequence with existing pattern and skip strategy
220
* @param pattern Initial pattern for the sequence
221
* @param afterMatchSkipStrategy Skip strategy after matches
222
* @return GroupPattern for chaining
223
*/
224
def begin[T, F <: T](
225
pattern: Pattern[T, F],
226
afterMatchSkipStrategy: AfterMatchSkipStrategy
227
): GroupPattern[T, F]
228
}
229
```
230
231
**Usage Examples:**
232
233
```scala
234
// Strict sequence - login immediately followed by purchase
235
val strictPattern = Pattern.begin[Event]("login")
236
.where(_.eventType == "login")
237
.next("purchase")
238
.where(_.eventType == "purchase")
239
240
// Relaxed sequence - login followed by purchase (other events allowed)
241
val relaxedPattern = Pattern.begin[Event]("login")
242
.where(_.eventType == "login")
243
.followedBy("purchase")
244
.where(_.eventType == "purchase")
245
246
// Negative pattern - login not followed by logout
247
val negativePattern = Pattern.begin[Event]("login")
248
.where(_.eventType == "login")
249
.notFollowedBy("logout")
250
```
251
252
### Time Windows
253
254
Define time constraints for pattern completion.
255
256
```scala { .api }
257
class Pattern[T, F <: T] {
258
/**
259
* Set maximum time for pattern completion
260
* @param windowTime Duration for pattern completion
261
* @return Pattern with time constraint
262
*/
263
def within(windowTime: Duration): Pattern[T, F]
264
265
/**
266
* Set maximum time for pattern completion (deprecated)
267
* @param windowTime Time window for pattern completion
268
* @return Pattern with time constraint
269
*/
270
@deprecated("Use within(Duration)", "1.19.0")
271
def within(windowTime: Time): Pattern[T, F]
272
}
273
```
274
275
**Usage Examples:**
276
277
```scala
278
import java.time.Duration
279
280
// Pattern must complete within 5 minutes
281
val timedPattern = Pattern.begin[Event]("start")
282
.where(_.eventType == "start")
283
.followedBy("end")
284
.where(_.eventType == "end")
285
.within(Duration.ofMinutes(5))
286
```
287
288
### Pattern Quantifiers
289
290
Apply quantifiers to specify repetition patterns.
291
292
```scala { .api }
293
class Pattern[T, F <: T] {
294
/**
295
* Make pattern optional (0 or 1 occurrence)
296
* @return Pattern marked as optional
297
*/
298
def optional: Pattern[T, F]
299
300
/**
301
* Pattern can occur one or more times
302
* @return Pattern with one-or-more quantifier
303
*/
304
def oneOrMore: Pattern[T, F]
305
306
/**
307
* Pattern occurs exactly N times
308
* @param times Exact number of occurrences
309
* @return Pattern with exact count quantifier
310
*/
311
def times(times: Int): Pattern[T, F]
312
313
/**
314
* Pattern occurs between from and to times
315
* @param from Minimum occurrences
316
* @param to Maximum occurrences
317
* @return Pattern with range quantifier
318
*/
319
def times(from: Int, to: Int): Pattern[T, F]
320
321
/**
322
* Pattern occurs at least N times
323
* @param times Minimum number of occurrences
324
* @return Pattern with at-least quantifier
325
*/
326
def timesOrMore(times: Int): Pattern[T, F]
327
328
/**
329
* Use greedy matching (match as many as possible)
330
* @return Pattern with greedy matching
331
*/
332
def greedy: Pattern[T, F]
333
334
/**
335
* Allow combinations in quantified patterns
336
* @return Pattern allowing combinations
337
*/
338
def allowCombinations(): Pattern[T, F]
339
340
/**
341
* Require consecutive matching for quantified patterns
342
* @return Pattern requiring consecutive matches
343
*/
344
def consecutive(): Pattern[T, F]
345
}
346
```
347
348
**Usage Examples:**
349
350
```scala
351
// Optional pattern
352
val optionalPattern = Pattern.begin[Event]("optional")
353
.where(_.eventType == "init")
354
.optional
355
.followedBy("required")
356
.where(_.eventType == "process")
357
358
// Repeated pattern
359
val repeatedPattern = Pattern.begin[Event]("repeated")
360
.where(_.eventType == "click")
361
.oneOrMore
362
.consecutive()
363
.followedBy("submit")
364
.where(_.eventType == "submit")
365
366
// Exact count
367
val exactPattern = Pattern.begin[Event]("exactly")
368
.where(_.eventType == "attempt")
369
.times(3)
370
.followedBy("success")
371
.where(_.eventType == "success")
372
```
373
374
### Until Conditions
375
376
Apply stop conditions for looping patterns.
377
378
```scala { .api }
379
class Pattern[T, F <: T] {
380
/**
381
* Stop condition with simple predicate
382
* @param untilCondition Condition to stop pattern matching
383
* @return Pattern with until condition
384
*/
385
def until(untilCondition: F => Boolean): Pattern[T, F]
386
387
/**
388
* Stop condition with context access
389
* @param untilCondition Condition function with context
390
* @return Pattern with until condition
391
*/
392
def until(untilCondition: (F, Context[F]) => Boolean): Pattern[T, F]
393
394
/**
395
* Stop condition with IterativeCondition
396
* @param untilCondition The IterativeCondition for stopping
397
* @return Pattern with until condition
398
*/
399
def until(untilCondition: IterativeCondition[F]): Pattern[T, F]
400
}
401
```
402
403
**Usage Examples:**
404
405
```scala
406
// Loop until condition is met
407
val loopingPattern = Pattern.begin[Event]("loop")
408
.where(_.eventType == "process")
409
.oneOrMore
410
.until(_.eventType == "complete")
411
.followedBy("final")
412
.where(_.eventType == "finish")
413
```
414
415
### Pattern Properties
416
417
Access pattern properties and configuration.
418
419
```scala { .api }
420
class Pattern[T, F <: T] {
421
/**
422
* Get the previous pattern in the chain
423
* @return Optional previous pattern
424
*/
425
def getPrevious: Option[Pattern[T, _ <: T]]
426
427
/**
428
* Get the name of this pattern
429
* @return Pattern name
430
*/
431
def getName: String
432
433
/**
434
* Get the time window for pattern completion (deprecated)
435
* @return Optional time window
436
*/
437
@deprecated("Use getWindowSize", "1.19.0")
438
def getWindowTime: Option[Time]
439
440
/**
441
* Get the time window duration for pattern completion
442
* @return Optional duration window
443
*/
444
def getWindowSize: Option[Duration]
445
446
/**
447
* Get the quantifier applied to this pattern
448
* @return Pattern quantifier
449
*/
450
def getQuantifier: Quantifier
451
452
/**
453
* Get the condition applied to this pattern
454
* @return Optional iterative condition
455
*/
456
def getCondition: Option[IterativeCondition[F]]
457
458
/**
459
* Get the until condition for looping patterns
460
* @return Optional until condition
461
*/
462
def getUntilCondition: Option[IterativeCondition[F]]
463
464
/**
465
* Get the after-match skip strategy
466
* @return After match skip strategy
467
*/
468
def getAfterMatchSkipStrategy: AfterMatchSkipStrategy
469
}
470
```
471
472
### GroupPattern
473
474
GroupPattern extends Pattern but restricts certain operations.
475
476
```scala { .api }
477
class GroupPattern[T, F <: T] extends Pattern[T, F] {
478
// Inherits all Pattern methods except:
479
// - where() methods throw UnsupportedOperationException
480
// - or() methods throw UnsupportedOperationException
481
// - subtype() throws UnsupportedOperationException
482
}
483
484
object GroupPattern {
485
/**
486
* Wrap Java GroupPattern
487
* @param jGroupPattern Java GroupPattern to wrap
488
* @return Scala GroupPattern wrapper
489
*/
490
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
491
}
492
```
493
494
**Usage Examples:**
495
496
```scala
497
// GroupPattern creation from Pattern chaining
498
val individualPattern = Pattern.begin[Event]("first").where(_.eventType == "start")
499
val anotherPattern = Pattern.begin[Event]("second").where(_.eventType == "end")
500
501
// Chain patterns to create GroupPattern
502
val groupPattern = individualPattern.followedBy(anotherPattern)
503
504
// Start with existing pattern
505
val groupFromPattern = Pattern.begin(individualPattern)
506
.followedBy("additional")
507
.where(_.eventType == "middle")
508
509
// Note: GroupPattern cannot use where(), or(), or subtype()
510
// This would throw UnsupportedOperationException:
511
// groupPattern.where(_.eventType == "invalid") // ERROR!
512
```
513
514
## Types
515
516
```scala { .api }
517
// Context for condition evaluation
518
trait Context[T] {
519
def getEventsForPattern(name: String): Iterable[T]
520
}
521
522
// Java interop types
523
import org.apache.flink.cep.pattern.conditions.IterativeCondition
524
import org.apache.flink.cep.pattern.conditions.SimpleCondition
525
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
526
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}
527
import org.apache.flink.cep.pattern.Quantifier
528
import org.apache.flink.streaming.api.windowing.time.Time
529
import java.time.Duration
530
```