or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md

group-pattern-management.mddocs/

0

# Group Pattern Management

1

2

Advanced pattern composition using GroupPattern for complex pattern sequences that combine multiple patterns into composite structures.

3

4

## Capabilities

5

6

### GroupPattern Overview

7

8

GroupPattern extends Pattern but restricts certain operations to maintain consistency in complex pattern sequences.

9

10

```scala { .api }

11

/**

12

* GroupPattern represents a composite pattern created by chaining Pattern objects

13

* @param jGroupPattern Underlying Java GroupPattern

14

* @tparam T Base type of events

15

* @tparam F Subtype constraint

16

*/

17

class GroupPattern[T, F <: T] extends Pattern[T, F] {

18

// Inherits all Pattern methods except where(), or(), and subtype()

19

// These methods throw UnsupportedOperationException

20

}

21

22

object GroupPattern {

23

/**

24

* Wrap Java GroupPattern for Scala API usage

25

* @param jGroupPattern Java GroupPattern to wrap

26

* @return Scala GroupPattern wrapper

27

*/

28

def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]

29

}

30

```

31

32

### Creating GroupPatterns

33

34

GroupPatterns are created by chaining Pattern objects together or starting with existing patterns.

35

36

```scala { .api }

37

class Pattern[T, F <: T] {

38

/**

39

* Chain with another pattern using followedBy semantics

40

* @param pattern The pattern to follow this one

41

* @return GroupPattern for further composition

42

*/

43

def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]

44

45

/**

46

* Chain with another pattern using followedByAny semantics

47

* @param pattern The pattern to follow this one

48

* @return GroupPattern for further composition

49

*/

50

def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]

51

52

/**

53

* Chain with another pattern using strict next semantics

54

* @param pattern The pattern to follow this one immediately

55

* @return GroupPattern for further composition

56

*/

57

def next(pattern: Pattern[T, F]): GroupPattern[T, F]

58

}

59

60

object Pattern {

61

/**

62

* Start a new pattern sequence with an existing pattern

63

* @param pattern Initial pattern for the sequence

64

* @return GroupPattern for building complex sequences

65

*/

66

def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]

67

68

/**

69

* Start a new pattern sequence with existing pattern and skip strategy

70

* @param pattern Initial pattern for the sequence

71

* @param afterMatchSkipStrategy Strategy for handling overlapping matches

72

* @return GroupPattern for building complex sequences

73

*/

74

def begin[T, F <: T](

75

pattern: Pattern[T, F],

76

afterMatchSkipStrategy: AfterMatchSkipStrategy

77

): GroupPattern[T, F]

78

}

79

```

80

81

**Usage Examples:**

82

83

```scala

84

case class Event(eventType: String, userId: String, value: Int, timestamp: Long)

85

86

// Create individual patterns

87

val loginPattern = Pattern.begin[Event]("login")

88

.where(_.eventType == "login")

89

90

val activityPattern = Pattern.begin[Event]("activity")

91

.where(_.eventType == "click")

92

.oneOrMore

93

94

val logoutPattern = Pattern.begin[Event]("logout")

95

.where(_.eventType == "logout")

96

97

// Combine patterns into GroupPattern using chaining

98

val sessionPattern = loginPattern

99

.followedBy(activityPattern)

100

.followedBy(logoutPattern)

101

102

// Start with an existing pattern to create GroupPattern

103

val complexSessionPattern = Pattern.begin(loginPattern)

104

.followedBy("purchase")

105

.where(_.eventType == "purchase")

106

.followedBy(logoutPattern)

107

```

108

109

### GroupPattern Restrictions

110

111

GroupPatterns inherit all Pattern methods but restrict condition-setting operations.

112

113

```scala { .api }

114

class GroupPattern[T, F <: T] extends Pattern[T, F] {

115

// These methods throw UnsupportedOperationException:

116

override def where(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED

117

override def or(condition: IterativeCondition[F]): Pattern[T, F] // NOT SUPPORTED

118

override def subtype[S <: F](clazz: Class[S]): Pattern[T, S] // NOT SUPPORTED

119

120

// All other Pattern methods are inherited and functional:

121

// - Quantifiers: optional, oneOrMore, times(), etc.

122

// - Temporal: within(), until()

123

// - Chaining: next(), followedBy(), etc.

124

// - Properties: getName, getQuantifier, etc.

125

}

126

```

127

128

**Usage Examples:**

129

130

```scala

131

val individualPattern = Pattern.begin[Event]("first")

132

.where(_.eventType == "start")

133

134

val groupPattern = individualPattern.followedBy(

135

Pattern.begin[Event]("second").where(_.eventType == "end")

136

)

137

138

// Valid GroupPattern operations

139

val timedGroupPattern = groupPattern

140

.within(Duration.ofMinutes(10))

141

.optional

142

143

// Invalid operations - these throw UnsupportedOperationException:

144

// groupPattern.where(_.value > 10) // ERROR!

145

// groupPattern.or(_.eventType == "other") // ERROR!

146

// groupPattern.subtype(classOf[SpecialEvent]) // ERROR!

147

148

// Workaround: Apply conditions to individual patterns before grouping

149

val conditionedPattern = Pattern.begin[Event]("filtered")

150

.where(_.value > 10)

151

.where(_.eventType == "special")

152

153

val validGroupPattern = individualPattern.followedBy(conditionedPattern)

154

```

155

156

### Advanced GroupPattern Composition

157

158

Complex pattern sequences with multiple levels of composition.

159

160

**Usage Examples:**

161

162

```scala

163

case class UserEvent(userId: String, action: String, sessionId: String, timestamp: Long)

164

165

// Multi-level pattern composition

166

val userLoginSequence = Pattern.begin[UserEvent]("login")

167

.where(_.action == "login")

168

.followedBy("initial_activity")

169

.where(_.action == "page_view")

170

171

val userEngagementSequence = Pattern.begin[UserEvent]("engagement")

172

.where(_.action == "click")

173

.oneOrMore

174

.followedBy("conversion")

175

.where(_.action == "purchase")

176

177

// Combine sequences into comprehensive user journey pattern

178

val userJourneyPattern = Pattern.begin(userLoginSequence)

179

.followedBy(userEngagementSequence)

180

.within(Duration.ofHours(2))

181

182

// Use with CEP

183

import org.apache.flink.cep.scala.CEP

184

185

val userJourneyStream = CEP.pattern(userEventStream, userJourneyPattern)

186

.select { pattern =>

187

val login = pattern("login").head

188

val engagementEvents = pattern("engagement")

189

val conversion = pattern("conversion").head

190

191

s"User ${login.userId} converted after ${engagementEvents.size} engagement events"

192

}

193

```

194

195

### GroupPattern with Skip Strategies

196

197

Configure how overlapping patterns are handled in GroupPattern sequences.

198

199

**Usage Examples:**

200

201

```scala

202

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy

203

204

case class MarketEvent(symbol: String, price: Double, volume: Long, timestamp: Long)

205

206

val priceDropPattern = Pattern.begin[MarketEvent]("drop")

207

.where(_.price < 100)

208

209

val recoveryPattern = Pattern.begin[MarketEvent]("recovery")

210

.where(_.price > 100)

211

212

// Different skip strategies for overlapping matches

213

val skipPastLastEvent = AfterMatchSkipStrategy.skipPastLastEvent()

214

val skipToNext = AfterMatchSkipStrategy.skipToNext()

215

216

// GroupPattern with skip past last event strategy

217

val marketCyclePattern = Pattern.begin(priceDropPattern, skipPastLastEvent)

218

.followedBy(recoveryPattern)

219

.within(Duration.ofDays(7))

220

221

// Alternative: skip to next match for more overlapping detection

222

val overlappingCyclesPattern = Pattern.begin(priceDropPattern, skipToNext)

223

.followedBy(recoveryPattern)

224

.within(Duration.ofDays(7))

225

```

226

227

## Types

228

229

```scala { .api }

230

// GroupPattern class and companion

231

import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}

232

233

class GroupPattern[T, F <: T] extends Pattern[T, F]

234

235

object GroupPattern {

236

def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]

237

}

238

239

// Skip strategies for pattern matching

240

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy

241

242

// Duration for time windows

243

import java.time.Duration

244

```