or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md

pattern-processing.mddocs/

0

# Pattern Processing

1

2

Process detected pattern sequences with flexible output generation including select, flatSelect, and process operations for transforming matched patterns into result streams.

3

4

## Capabilities

5

6

### Select Operations

7

8

Transform matched patterns into single output elements using select functions.

9

10

```scala { .api }

11

class PatternStream[T] {

12

/**

13

* Process patterns with Scala function returning single result per match

14

* @param patternSelectFun Function transforming pattern map to result

15

* @tparam R Type of result elements

16

* @return DataStream containing results

17

*/

18

def select[R: TypeInformation](patternSelectFun: Map[String, Iterable[T]] => R): DataStream[R]

19

20

/**

21

* Process patterns with PatternSelectFunction interface

22

* @param patternSelectFunction Java interface for pattern selection

23

* @tparam R Type of result elements

24

* @return DataStream containing results

25

*/

26

def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R]

27

}

28

```

29

30

**Usage Examples:**

31

32

```scala

33

import org.apache.flink.api.common.typeinfo.TypeInformation

34

import org.apache.flink.cep.PatternSelectFunction

35

import java.util.{List => JList, Map => JMap}

36

37

case class Event(userId: String, action: String, timestamp: Long)

38

case class UserSession(userId: String, startAction: String, endAction: String, duration: Long)

39

40

// Scala function approach

41

val sessionPattern = Pattern.begin[Event]("start")

42

.where(_.action == "login")

43

.followedBy("end")

44

.where(_.action == "logout")

45

46

val sessions = CEP.pattern(dataStream, sessionPattern)

47

.select { pattern: Map[String, Iterable[Event]] =>

48

val startEvent = pattern("start").head

49

val endEvent = pattern("end").head

50

UserSession(

51

startEvent.userId,

52

startEvent.action,

53

endEvent.action,

54

endEvent.timestamp - startEvent.timestamp

55

)

56

}

57

58

// Java interface approach

59

val javaSelectFunction = new PatternSelectFunction[Event, String] {

60

override def select(pattern: JMap[String, JList[Event]]): String = {

61

val start = pattern.get("start").get(0)

62

val end = pattern.get("end").get(0)

63

s"Session: ${start.userId} from ${start.timestamp} to ${end.timestamp}"

64

}

65

}

66

67

val sessionsJava = CEP.pattern(dataStream, sessionPattern)

68

.select(javaSelectFunction)

69

```

70

71

### Flat Select Operations

72

73

Transform matched patterns into zero or more output elements using flatSelect functions.

74

75

```scala { .api }

76

class PatternStream[T] {

77

/**

78

* Process patterns with Scala function that can emit multiple results

79

* @param patternFlatSelectFun Function taking pattern and collector

80

* @tparam R Type of result elements

81

* @return DataStream containing results

82

*/

83

def flatSelect[R: TypeInformation](

84

patternFlatSelectFun: (Map[String, Iterable[T]], Collector[R]) => Unit

85

): DataStream[R]

86

87

/**

88

* Process patterns with PatternFlatSelectFunction interface

89

* @param patternFlatSelectFunction Java interface for flat pattern selection

90

* @tparam R Type of result elements

91

* @return DataStream containing results

92

*/

93

def flatSelect[R: TypeInformation](

94

patternFlatSelectFunction: PatternFlatSelectFunction[T, R]

95

): DataStream[R]

96

}

97

```

98

99

**Usage Examples:**

100

101

```scala

102

import org.apache.flink.util.Collector

103

import org.apache.flink.cep.PatternFlatSelectFunction

104

105

case class Purchase(userId: String, item: String, amount: Double, timestamp: Long)

106

case class Alert(userId: String, message: String, severity: String)

107

108

// Pattern for multiple purchases

109

val purchasePattern = Pattern.begin[Purchase]("purchases")

110

.where(_.amount > 0)

111

.oneOrMore

112

.within(Duration.ofHours(1))

113

114

// Scala flat select - generate multiple alerts per pattern

115

val alerts = CEP.pattern(purchaseStream, purchasePattern)

116

.flatSelect { (pattern: Map[String, Iterable[Purchase]], out: Collector[Alert]) =>

117

val purchases = pattern("purchases")

118

val totalAmount = purchases.map(_.amount).sum

119

val userId = purchases.head.userId

120

121

// Emit volume alert

122

if (totalAmount > 1000) {

123

out.collect(Alert(userId, s"High volume: $$${totalAmount}", "HIGH"))

124

}

125

126

// Emit frequency alert

127

if (purchases.size > 10) {

128

out.collect(Alert(userId, s"High frequency: ${purchases.size} purchases", "MEDIUM"))

129

}

130

131

// Emit item-specific alerts

132

purchases.groupBy(_.item).foreach { case (item, itemPurchases) =>

133

if (itemPurchases.size > 3) {

134

out.collect(Alert(userId, s"Repeated item: $item (${itemPurchases.size}x)", "LOW"))

135

}

136

}

137

}

138

139

// Java interface approach

140

val javaFlatSelectFunction = new PatternFlatSelectFunction[Purchase, String] {

141

override def flatSelect(pattern: JMap[String, JList[Purchase]], out: Collector[String]): Unit = {

142

val purchases = pattern.get("purchases")

143

purchases.forEach { purchase =>

144

out.collect(s"Purchase: ${purchase.userId} bought ${purchase.item}")

145

}

146

}

147

}

148

```

149

150

### Process Function Operations

151

152

Use PatternProcessFunction for advanced pattern processing with rich context access.

153

154

```scala { .api }

155

class PatternStream[T] {

156

/**

157

* Process patterns using PatternProcessFunction with full context

158

* @param patternProcessFunction Process function with rich context

159

* @tparam R Type of result elements

160

* @return DataStream containing results

161

*/

162

def process[R: TypeInformation](

163

patternProcessFunction: PatternProcessFunction[T, R]

164

): DataStream[R]

165

}

166

```

167

168

**Usage Examples:**

169

170

```scala

171

import org.apache.flink.cep.functions.PatternProcessFunction

172

import org.apache.flink.configuration.Configuration

173

import org.apache.flink.util.Collector

174

175

case class Transaction(accountId: String, amount: Double, location: String, timestamp: Long)

176

case class FraudAlert(accountId: String, reason: String, transactions: List[Transaction], riskScore: Double)

177

178

class FraudDetectionProcessFunction extends PatternProcessFunction[Transaction, FraudAlert] {

179

180

override def processMatch(

181

`match`: JMap[String, JList[Transaction]],

182

ctx: PatternProcessFunction.Context,

183

out: Collector[FraudAlert]

184

): Unit = {

185

186

val suspiciousTransactions = `match`.get("suspicious")

187

import scala.jdk.CollectionConverters._

188

val transactions = suspiciousTransactions.asScala.toList

189

190

val accountId = transactions.head.accountId

191

val totalAmount = transactions.map(_.amount).sum

192

val locations = transactions.map(_.location).toSet

193

194

// Calculate risk score based on various factors

195

var riskScore = 0.0

196

var reasons = List.empty[String]

197

198

// High transaction volume

199

if (totalAmount > 10000) {

200

riskScore += 0.4

201

reasons = "high-volume" :: reasons

202

}

203

204

// Multiple locations

205

if (locations.size > 2) {

206

riskScore += 0.3

207

reasons = "multiple-locations" :: reasons

208

}

209

210

// Rapid succession

211

val timeSpan = transactions.maxBy(_.timestamp).timestamp - transactions.minBy(_.timestamp).timestamp

212

if (timeSpan < 300000) { // 5 minutes

213

riskScore += 0.3

214

reasons = "rapid-succession" :: reasons

215

}

216

217

if (riskScore > 0.5) {

218

out.collect(FraudAlert(

219

accountId,

220

reasons.mkString(", "),

221

transactions,

222

riskScore

223

))

224

}

225

}

226

}

227

228

// Usage with process function

229

val fraudPattern = Pattern.begin[Transaction]("suspicious")

230

.where(_.amount > 500)

231

.oneOrMore

232

.within(Duration.ofMinutes(10))

233

234

val fraudAlerts = CEP.pattern(transactionStream, fraudPattern)

235

.process(new FraudDetectionProcessFunction())

236

```

237

238

### Pattern Map Access

239

240

Understanding how to access matched events from pattern maps.

241

242

**Usage Examples:**

243

244

```scala

245

// Pattern with multiple named elements

246

val complexPattern = Pattern.begin[Event]("first")

247

.where(_.action == "start")

248

.next("second")

249

.where(_.action == "process")

250

.followedBy("third")

251

.where(_.action == "end")

252

253

val results = CEP.pattern(dataStream, complexPattern)

254

.select { pattern: Map[String, Iterable[Event]] =>

255

// Access events by pattern name

256

val firstEvents = pattern("first") // Iterable[Event]

257

val secondEvents = pattern("second") // Iterable[Event]

258

val thirdEvents = pattern("third") // Iterable[Event]

259

260

// For non-quantified patterns, typically one event

261

val firstEvent = firstEvents.head

262

val secondEvent = secondEvents.head

263

val thirdEvent = thirdEvents.head

264

265

s"Sequence: ${firstEvent.action} -> ${secondEvent.action} -> ${thirdEvent.action}"

266

}

267

268

// Pattern with quantified elements

269

val quantifiedPattern = Pattern.begin[Event]("start")

270

.where(_.action == "begin")

271

.followedBy("repeated")

272

.where(_.action == "process")

273

.oneOrMore // This can match multiple events

274

.followedBy("end")

275

.where(_.action == "finish")

276

277

val quantifiedResults = CEP.pattern(dataStream, quantifiedPattern)

278

.select { pattern: Map[String, Iterable[Event]] =>

279

val startEvent = pattern("start").head

280

val repeatedEvents = pattern("repeated") // Multiple events possible

281

val endEvent = pattern("end").head

282

283

s"Start: ${startEvent.action}, Repeated: ${repeatedEvents.size} times, End: ${endEvent.action}"

284

}

285

```

286

287

## Types

288

289

```scala { .api }

290

// Core collector interface

291

import org.apache.flink.util.Collector

292

293

// Pattern processing interfaces from Java CEP

294

import org.apache.flink.cep.PatternSelectFunction

295

import org.apache.flink.cep.PatternFlatSelectFunction

296

import org.apache.flink.cep.functions.PatternProcessFunction

297

298

// Type information for Scala

299

import org.apache.flink.api.common.typeinfo.TypeInformation

300

301

// Collection conversions

302

import scala.collection.Map

303

import java.util.{List => JList, Map => JMap}

304

305

// Abstract interfaces

306

abstract class PatternSelectFunction[T, R] {

307

def select(pattern: JMap[String, JList[T]]): R

308

}

309

310

abstract class PatternFlatSelectFunction[T, R] {

311

def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit

312

}

313

314

abstract class PatternProcessFunction[T, R] extends AbstractRichFunction {

315

def processMatch(

316

`match`: JMap[String, JList[T]],

317

ctx: PatternProcessFunction.Context,

318

out: Collector[R]

319

): Unit

320

}

321

```