or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

group-pattern-management.mdindex.mdpattern-definition.mdpattern-processing.mdpattern-stream-creation.mdtimeout-handling.md

pattern-definition.mddocs/

0

# Pattern Definition

1

2

Complex event pattern definition using the fluent Scala DSL with temporal constraints, conditions, and quantifiers.

3

4

## Capabilities

5

6

### Pattern Creation

7

8

Start a new pattern sequence with a named initial pattern.

9

10

```scala { .api }

11

/**

12

* Start a new pattern sequence with the given name

13

* @param name The name of starting pattern

14

* @tparam X Base type of events in the pattern

15

* @return The first pattern of a pattern sequence

16

*/

17

object Pattern {

18

def begin[X](name: String): Pattern[X, X]

19

def begin[X](name: String, afterMatchSkipStrategy: AfterMatchSkipStrategy): Pattern[X, X]

20

}

21

```

22

23

**Usage Examples:**

24

25

```scala

26

import org.apache.flink.cep.scala.pattern.Pattern

27

28

case class LoginEvent(userId: String, timestamp: Long)

29

30

// Simple pattern start

31

val pattern = Pattern.begin[LoginEvent]("login")

32

.where(_.userId.nonEmpty)

33

34

// Pattern with skip strategy

35

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy

36

val skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent()

37

val pattern = Pattern.begin[LoginEvent]("login", skipStrategy)

38

```

39

40

### Condition Definition

41

42

Add conditions that events must satisfy to be considered matches.

43

44

```scala { .api }

45

class Pattern[T, F <: T] {

46

/**

47

* Add AND condition using simple predicate function

48

* @param condition Predicate function for the event

49

* @return Pattern with the new condition

50

*/

51

def where(condition: F => Boolean): Pattern[T, F]

52

53

/**

54

* Add AND condition with access to context

55

* @param condition Function taking event and context

56

* @return Pattern with the new condition

57

*/

58

def where(condition: (F, Context[F]) => Boolean): Pattern[T, F]

59

60

/**

61

* Add AND condition using IterativeCondition

62

* @param condition The IterativeCondition to apply

63

* @return Pattern with the new condition

64

*/

65

def where(condition: IterativeCondition[F]): Pattern[T, F]

66

67

/**

68

* Add OR condition using simple predicate function

69

* @param condition Predicate function for the event

70

* @return Pattern with the new condition

71

*/

72

def or(condition: F => Boolean): Pattern[T, F]

73

74

/**

75

* Add OR condition with access to context

76

* @param condition Function taking event and context

77

* @return Pattern with the new condition

78

*/

79

def or(condition: (F, Context[F]) => Boolean): Pattern[T, F]

80

81

/**

82

* Add OR condition using IterativeCondition

83

* @param condition The IterativeCondition to apply

84

* @return Pattern with the new condition

85

*/

86

def or(condition: IterativeCondition[F]): Pattern[T, F]

87

}

88

```

89

90

**Usage Examples:**

91

92

```scala

93

case class Event(eventType: String, value: Int, userId: String)

94

95

// Simple condition

96

val pattern = Pattern.begin[Event]("start")

97

.where(_.eventType == "login")

98

.or(_.eventType == "signup")

99

100

// Context-aware condition

101

val pattern = Pattern.begin[Event]("start")

102

.where(_.eventType == "purchase")

103

.next("confirmation")

104

.where((event, ctx) => {

105

val previousEvents = ctx.getEventsForPattern("start")

106

event.userId == previousEvents.head.userId

107

})

108

```

109

110

### Subtype Constraints

111

112

Apply subtype constraints to patterns for type-safe event matching.

113

114

```scala { .api }

115

class Pattern[T, F <: T] {

116

/**

117

* Apply subtype constraint requiring events to be of specific subtype

118

* @param clazz Class of the required subtype

119

* @tparam S The subtype

120

* @return Pattern constrained to the subtype

121

*/

122

def subtype[S <: F](clazz: Class[S]): Pattern[T, S]

123

}

124

```

125

126

**Usage Examples:**

127

128

```scala

129

abstract class Event(eventType: String)

130

case class LoginEvent(userId: String) extends Event("login")

131

case class PurchaseEvent(userId: String, amount: Double) extends Event("purchase")

132

133

val pattern = Pattern.begin[Event]("start")

134

.subtype(classOf[LoginEvent])

135

.next("purchase")

136

.subtype(classOf[PurchaseEvent])

137

```

138

139

### Temporal Pattern Chaining

140

141

Chain patterns with different temporal contiguity requirements.

142

143

```scala { .api }

144

class Pattern[T, F <: T] {

145

/**

146

* Strict temporal contiguity - no events between matches

147

* @param name Name of the next pattern

148

* @return New pattern enforcing strict contiguity

149

*/

150

def next(name: String): Pattern[T, T]

151

152

/**

153

* Non-strict temporal contiguity - events may be interleaved

154

* @param name Name of the following pattern

155

* @return New pattern allowing interleaved events

156

*/

157

def followedBy(name: String): Pattern[T, T]

158

159

/**

160

* Non-deterministic following - matches any occurrence

161

* @param name Name of the following pattern

162

* @return New pattern with non-deterministic following

163

*/

164

def followedByAny(name: String): Pattern[T, T]

165

166

/**

167

* Negative pattern - no matching event should follow

168

* @param name Name of the pattern that should not occur

169

* @return New pattern with negative constraint

170

*/

171

def notNext(name: String): Pattern[T, T]

172

173

/**

174

* Negative following pattern - no matching event should occur between

175

* @param name Name of the pattern that should not occur

176

* @return New pattern with negative constraint

177

*/

178

def notFollowedBy(name: String): Pattern[T, T]

179

}

180

```

181

182

### Group Pattern Chaining

183

184

Chain patterns with other Pattern objects to create GroupPatterns.

185

186

```scala { .api }

187

class Pattern[T, F <: T] {

188

/**

189

* Chain with another pattern using followedBy semantics

190

* @param pattern The pattern to follow this one

191

* @return GroupPattern for further chaining

192

*/

193

def followedBy(pattern: Pattern[T, F]): GroupPattern[T, F]

194

195

/**

196

* Chain with another pattern using followedByAny semantics

197

* @param pattern The pattern to follow this one

198

* @return GroupPattern for further chaining

199

*/

200

def followedByAny(pattern: Pattern[T, F]): GroupPattern[T, F]

201

202

/**

203

* Chain with another pattern using next semantics

204

* @param pattern The pattern to follow this one strictly

205

* @return GroupPattern for further chaining

206

*/

207

def next(pattern: Pattern[T, F]): GroupPattern[T, F]

208

}

209

210

object Pattern {

211

/**

212

* Start pattern sequence with existing pattern

213

* @param pattern Initial pattern for the sequence

214

* @return GroupPattern for chaining

215

*/

216

def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F]

217

218

/**

219

* Start pattern sequence with existing pattern and skip strategy

220

* @param pattern Initial pattern for the sequence

221

* @param afterMatchSkipStrategy Skip strategy after matches

222

* @return GroupPattern for chaining

223

*/

224

def begin[T, F <: T](

225

pattern: Pattern[T, F],

226

afterMatchSkipStrategy: AfterMatchSkipStrategy

227

): GroupPattern[T, F]

228

}

229

```

230

231

**Usage Examples:**

232

233

```scala

234

// Strict sequence - login immediately followed by purchase

235

val strictPattern = Pattern.begin[Event]("login")

236

.where(_.eventType == "login")

237

.next("purchase")

238

.where(_.eventType == "purchase")

239

240

// Relaxed sequence - login followed by purchase (other events allowed)

241

val relaxedPattern = Pattern.begin[Event]("login")

242

.where(_.eventType == "login")

243

.followedBy("purchase")

244

.where(_.eventType == "purchase")

245

246

// Negative pattern - login not followed by logout

247

val negativePattern = Pattern.begin[Event]("login")

248

.where(_.eventType == "login")

249

.notFollowedBy("logout")

250

```

251

252

### Time Windows

253

254

Define time constraints for pattern completion.

255

256

```scala { .api }

257

class Pattern[T, F <: T] {

258

/**

259

* Set maximum time for pattern completion

260

* @param windowTime Duration for pattern completion

261

* @return Pattern with time constraint

262

*/

263

def within(windowTime: Duration): Pattern[T, F]

264

265

/**

266

* Set maximum time for pattern completion (deprecated)

267

* @param windowTime Time window for pattern completion

268

* @return Pattern with time constraint

269

*/

270

@deprecated("Use within(Duration)", "1.19.0")

271

def within(windowTime: Time): Pattern[T, F]

272

}

273

```

274

275

**Usage Examples:**

276

277

```scala

278

import java.time.Duration

279

280

// Pattern must complete within 5 minutes

281

val timedPattern = Pattern.begin[Event]("start")

282

.where(_.eventType == "start")

283

.followedBy("end")

284

.where(_.eventType == "end")

285

.within(Duration.ofMinutes(5))

286

```

287

288

### Pattern Quantifiers

289

290

Apply quantifiers to specify repetition patterns.

291

292

```scala { .api }

293

class Pattern[T, F <: T] {

294

/**

295

* Make pattern optional (0 or 1 occurrence)

296

* @return Pattern marked as optional

297

*/

298

def optional: Pattern[T, F]

299

300

/**

301

* Pattern can occur one or more times

302

* @return Pattern with one-or-more quantifier

303

*/

304

def oneOrMore: Pattern[T, F]

305

306

/**

307

* Pattern occurs exactly N times

308

* @param times Exact number of occurrences

309

* @return Pattern with exact count quantifier

310

*/

311

def times(times: Int): Pattern[T, F]

312

313

/**

314

* Pattern occurs between from and to times

315

* @param from Minimum occurrences

316

* @param to Maximum occurrences

317

* @return Pattern with range quantifier

318

*/

319

def times(from: Int, to: Int): Pattern[T, F]

320

321

/**

322

* Pattern occurs at least N times

323

* @param times Minimum number of occurrences

324

* @return Pattern with at-least quantifier

325

*/

326

def timesOrMore(times: Int): Pattern[T, F]

327

328

/**

329

* Use greedy matching (match as many as possible)

330

* @return Pattern with greedy matching

331

*/

332

def greedy: Pattern[T, F]

333

334

/**

335

* Allow combinations in quantified patterns

336

* @return Pattern allowing combinations

337

*/

338

def allowCombinations(): Pattern[T, F]

339

340

/**

341

* Require consecutive matching for quantified patterns

342

* @return Pattern requiring consecutive matches

343

*/

344

def consecutive(): Pattern[T, F]

345

}

346

```

347

348

**Usage Examples:**

349

350

```scala

351

// Optional pattern

352

val optionalPattern = Pattern.begin[Event]("optional")

353

.where(_.eventType == "init")

354

.optional

355

.followedBy("required")

356

.where(_.eventType == "process")

357

358

// Repeated pattern

359

val repeatedPattern = Pattern.begin[Event]("repeated")

360

.where(_.eventType == "click")

361

.oneOrMore

362

.consecutive()

363

.followedBy("submit")

364

.where(_.eventType == "submit")

365

366

// Exact count

367

val exactPattern = Pattern.begin[Event]("exactly")

368

.where(_.eventType == "attempt")

369

.times(3)

370

.followedBy("success")

371

.where(_.eventType == "success")

372

```

373

374

### Until Conditions

375

376

Apply stop conditions for looping patterns.

377

378

```scala { .api }

379

class Pattern[T, F <: T] {

380

/**

381

* Stop condition with simple predicate

382

* @param untilCondition Condition to stop pattern matching

383

* @return Pattern with until condition

384

*/

385

def until(untilCondition: F => Boolean): Pattern[T, F]

386

387

/**

388

* Stop condition with context access

389

* @param untilCondition Condition function with context

390

* @return Pattern with until condition

391

*/

392

def until(untilCondition: (F, Context[F]) => Boolean): Pattern[T, F]

393

394

/**

395

* Stop condition with IterativeCondition

396

* @param untilCondition The IterativeCondition for stopping

397

* @return Pattern with until condition

398

*/

399

def until(untilCondition: IterativeCondition[F]): Pattern[T, F]

400

}

401

```

402

403

**Usage Examples:**

404

405

```scala

406

// Loop until condition is met

407

val loopingPattern = Pattern.begin[Event]("loop")

408

.where(_.eventType == "process")

409

.oneOrMore

410

.until(_.eventType == "complete")

411

.followedBy("final")

412

.where(_.eventType == "finish")

413

```

414

415

### Pattern Properties

416

417

Access pattern properties and configuration.

418

419

```scala { .api }

420

class Pattern[T, F <: T] {

421

/**

422

* Get the previous pattern in the chain

423

* @return Optional previous pattern

424

*/

425

def getPrevious: Option[Pattern[T, _ <: T]]

426

427

/**

428

* Get the name of this pattern

429

* @return Pattern name

430

*/

431

def getName: String

432

433

/**

434

* Get the time window for pattern completion (deprecated)

435

* @return Optional time window

436

*/

437

@deprecated("Use getWindowSize", "1.19.0")

438

def getWindowTime: Option[Time]

439

440

/**

441

* Get the time window duration for pattern completion

442

* @return Optional duration window

443

*/

444

def getWindowSize: Option[Duration]

445

446

/**

447

* Get the quantifier applied to this pattern

448

* @return Pattern quantifier

449

*/

450

def getQuantifier: Quantifier

451

452

/**

453

* Get the condition applied to this pattern

454

* @return Optional iterative condition

455

*/

456

def getCondition: Option[IterativeCondition[F]]

457

458

/**

459

* Get the until condition for looping patterns

460

* @return Optional until condition

461

*/

462

def getUntilCondition: Option[IterativeCondition[F]]

463

464

/**

465

* Get the after-match skip strategy

466

* @return After match skip strategy

467

*/

468

def getAfterMatchSkipStrategy: AfterMatchSkipStrategy

469

}

470

```

471

472

### GroupPattern

473

474

GroupPattern extends Pattern but restricts certain operations.

475

476

```scala { .api }

477

class GroupPattern[T, F <: T] extends Pattern[T, F] {

478

// Inherits all Pattern methods except:

479

// - where() methods throw UnsupportedOperationException

480

// - or() methods throw UnsupportedOperationException

481

// - subtype() throws UnsupportedOperationException

482

}

483

484

object GroupPattern {

485

/**

486

* Wrap Java GroupPattern

487

* @param jGroupPattern Java GroupPattern to wrap

488

* @return Scala GroupPattern wrapper

489

*/

490

def apply[T, F <: T](jGroupPattern: JGroupPattern[T, F]): GroupPattern[T, F]

491

}

492

```

493

494

**Usage Examples:**

495

496

```scala

497

// GroupPattern creation from Pattern chaining

498

val individualPattern = Pattern.begin[Event]("first").where(_.eventType == "start")

499

val anotherPattern = Pattern.begin[Event]("second").where(_.eventType == "end")

500

501

// Chain patterns to create GroupPattern

502

val groupPattern = individualPattern.followedBy(anotherPattern)

503

504

// Start with existing pattern

505

val groupFromPattern = Pattern.begin(individualPattern)

506

.followedBy("additional")

507

.where(_.eventType == "middle")

508

509

// Note: GroupPattern cannot use where(), or(), or subtype()

510

// This would throw UnsupportedOperationException:

511

// groupPattern.where(_.eventType == "invalid") // ERROR!

512

```

513

514

## Types

515

516

```scala { .api }

517

// Context for condition evaluation

518

trait Context[T] {

519

def getEventsForPattern(name: String): Iterable[T]

520

}

521

522

// Java interop types

523

import org.apache.flink.cep.pattern.conditions.IterativeCondition

524

import org.apache.flink.cep.pattern.conditions.SimpleCondition

525

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy

526

import org.apache.flink.cep.pattern.{GroupPattern => JGroupPattern}

527

import org.apache.flink.cep.pattern.Quantifier

528

import org.apache.flink.streaming.api.windowing.time.Time

529

import java.time.Duration

530

```