0
# Group Pattern Management
1
2
Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns into composite structures.
3
4
## Capabilities
5
6
### GroupPattern Overview
7
8
GroupPattern extends Pattern but restricts certain operations to maintain consistency in complex pattern sequences.
9
10
```scala { .api }
11
/**
12
* GroupPattern represents a composite pattern created by chaining Pattern objects
13
* @param jGroupPattern Underlying Java GroupPattern
14
* @tparam T Base type of events
15
* @tparam F Subtype constraint
16
*/
17
class GroupPattern[T, F <: T] extends Pattern[T, F] {
18
// Inherits all Pattern methods except where(), or(), and subtype()
19
// These methods throw UnsupportedOperationException
20
}
21
22
object GroupPattern {
23
/**
24
* Wrap Java GroupPattern for Scala API usage
25
* @param jGroupPattern Java GroupPattern to wrap
26
* @return Scala GroupPattern wrapper
27
*/
28
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
29
}
30
```
31
32
### Creating GroupPatterns
33
34
GroupPatterns are created by chaining Pattern objects together or starting with existing patterns.
35
36
```scala { .api }
37
class Pattern[T, F <: T] {
38
/**
39
* Chain with another pattern using followedBy semantics
40
* @param pattern The pattern to follow this one
41
* @return GroupPattern for further composition
42
*/
43
def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]
44
45
/**
46
* Chain with another pattern using followedByAny semantics
47
* @param pattern The pattern to follow this one
48
* @return GroupPattern for further composition
49
*/
50
def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]
51
52
/**
53
* Chain with another pattern using strict next semantics
54
* @param pattern The pattern to follow this one immediately
55
* @return GroupPattern for further composition
56
*/
57
def next(pattern: Pattern[T, F]): GroupPattern[T, F]
58
}
59
60
object Pattern {
61
/**
62
* Start a new pattern sequence with an existing pattern
63
* @param pattern Initial pattern for the sequence
64
* @return GroupPattern for building complex sequences
65
*/
66
def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]
67
68
/**
69
* Start a new pattern sequence with existing pattern and skip strategy
70
* @param pattern Initial pattern for the sequence
71
* @param afterMatchSkipStrategy Strategy for handling overlapping matches
72
* @return GroupPattern for building complex sequences
73
*/
74
def begin[T, F <: T](
75
pattern: Pattern[T, F],
76
afterMatchSkipStrategy: AfterMatchSkipStrategy
77
): GroupPattern[T, F]
78
}
79
```
80
81
**Usage Examples:**
82
83
```scala
84
case class Event(eventType: String, userId: String, value: Int, timestamp: Long)
85
86
// Create individual patterns
87
val loginPattern = Pattern.begin[Event]("login")
88
.where(_.eventType == "login")
89
90
val activityPattern = Pattern.begin[Event]("activity")
91
.where(_.eventType == "click")
92
.oneOrMore
93
94
val logoutPattern = Pattern.begin[Event]("logout")
95
.where(_.eventType == "logout")
96
97
// Combine patterns into GroupPattern using chaining
98
val sessionPattern = loginPattern
99
.followedBy(activityPattern)
100
.followedBy(logoutPattern)
101
102
// Start with an existing pattern to create GroupPattern
103
val complexSessionPattern = Pattern.begin(loginPattern)
104
.followedBy("purchase")
105
.where(_.eventType == "purchase")
106
.followedBy(logoutPattern)
107
```
108
109
### GroupPattern Restrictions
110
111
GroupPatterns inherit all Pattern methods but restrict condition-setting operations.
112
113
```scala { .api }
114
class GroupPattern[T, F <: T] extends Pattern[T, F] {
115
// These methods throw UnsupportedOperationException:
116
override def where(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED
117
override def or(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED
118
override def subtype[S <: F](clazz: Class[S]): Pattern[T, S] // NOT SUPPORTED
119
120
// All other Pattern methods are inherited and functional:
121
// - Quantifiers: optional, oneOrMore, times(), etc.
122
// - Temporal: within(), until()
123
// - Chaining: next(), followedBy(), etc.
124
// - Properties: getName, getQuantifier, etc.
125
}
126
```
127
128
**Usage Examples:**
129
130
```scala
131
val individualPattern = Pattern.begin[Event]("first")
132
.where(_.eventType == "start")
133
134
val groupPattern = individualPattern.followedBy(
135
Pattern.begin[Event]("second").where(_.eventType == "end")
136
)
137
138
// Valid GroupPattern operations
139
val timedGroupPattern = groupPattern
140
.within(Duration.ofMinutes(10))
141
.optional
142
143
// Invalid operations - these throw UnsupportedOperationException:
144
// groupPattern.where(_.value > 10) // ERROR!
145
// groupPattern.or(_.eventType == "other") // ERROR!
146
// groupPattern.subtype(classOf[SpecialEvent]) // ERROR!
147
148
// Workaround: Apply conditions to individual patterns before grouping
149
val conditionedPattern = Pattern.begin[Event]("filtered")
150
.where(_.value > 10)
151
.where(_.eventType == "special")
152
153
val validGroupPattern = individualPattern.followedBy(conditionedPattern)
154
```
155
156
### Advanced GroupPattern Composition
157
158
Complex pattern sequences with multiple levels of composition.
159
160
**Usage Examples:**
161
162
```scala
163
case class UserEvent(userId: String, action: String, sessionId: String, timestamp: Long)
164
165
// Multi-level pattern composition
166
val userLoginSequence = Pattern.begin[UserEvent]("login")
167
.where(_.action == "login")
168
.followedBy("initial_activity")
169
.where(_.action == "page_view")
170
171
val userEngagementSequence = Pattern.begin[UserEvent]("engagement")
172
.where(_.action == "click")
173
.oneOrMore
174
.followedBy("conversion")
175
.where(_.action == "purchase")
176
177
// Combine sequences into comprehensive user journey pattern
178
val userJourneyPattern = Pattern.begin(userLoginSequence)
179
.followedBy(userEngagementSequence)
180
.within(Duration.ofHours(2))
181
182
// Use with CEP
183
import org.apache.flink.cep.scala.CEP
184
185
val userJourneyStream = CEP.pattern(userEventStream, userJourneyPattern)
186
.select { pattern =>
187
val login = pattern("login").head
188
val engagementEvents = pattern("engagement")
189
val conversion = pattern("conversion").head
190
191
s"User ${login.userId} converted after ${engagementEvents.size} engagement events"
192
}
193
```
194
195
### GroupPattern with Skip Strategies
196
197
Configure how overlapping patterns are handled in GroupPattern sequences.
198
199
**Usage Examples:**
200
201
```scala
202
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
203
204
case class MarketEvent(symbol: String, price: Double, volume: Long, timestamp: Long)
205
206
val priceDropPattern = Pattern.begin[MarketEvent]("drop")
207
.where(_.price < 100)
208
209
val recoveryPattern = Pattern.begin[MarketEvent]("recovery")
210
.where(_.price > 100)
211
212
// Different skip strategies for overlapping matches
213
val skipPastLastEvent = AfterMatchSkipStrategy.skipPastLastEvent()
214
val skipToNext = AfterMatchSkipStrategy.skipToNext()
215
216
// GroupPattern with skip past last event strategy
217
val marketCyclePattern = Pattern.begin(priceDropPattern, skipPastLastEvent)
218
.followedBy(recoveryPattern)
219
.within(Duration.ofDays(7))
220
221
// Alternative: skip to next match for more overlapping detection
222
val overlappingCyclesPattern = Pattern.begin(priceDropPattern, skipToNext)
223
.followedBy(recoveryPattern)
224
.within(Duration.ofDays(7))
225
```
226
227
## Types
228
229
```scala { .api }
230
// GroupPattern class and companion
231
import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}
232
233
class GroupPattern[T, F <: T] extends Pattern[T, F]
234
235
object GroupPattern {
236
def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]
237
}
238
239
// Skip strategies for pattern matching
240
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
241
242
// Duration for time windows
243
import java.time.Duration
244
```