or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md

complex-event-processing.mddocs/

0

# Complex Event Processing (CEP)

1

2

This document covers pattern matching and complex event detection capabilities on streaming data using Apache Flink's CEP library bundled in the Table Uber Blink package.

3

4

## Basic Pattern Definition

5

6

### Pattern Creation

7

8

```java { .api }

9

class Pattern<T, F extends T> {

10

static <X> Pattern<X, X> begin(String name);

11

static <X> Pattern<X, X> begin(String name, AfterMatchSkipStrategy afterMatchSkipStrategy);

12

13

Pattern<T, F> where(SimpleCondition<F> condition);

14

Pattern<T, F> where(IterativeCondition<F> condition);

15

Pattern<T, F> or(SimpleCondition<F> condition);

16

Pattern<T, F> or(IterativeCondition<F> condition);

17

18

Pattern<T, F> next(String name);

19

Pattern<T, F> followedBy(String name);

20

Pattern<T, F> followedByAny(String name);

21

Pattern<T, F> notNext();

22

Pattern<T, F> notFollowedBy();

23

24

Pattern<T, F> within(Time within);

25

Pattern<T, F> times(int times);

26

Pattern<T, F> times(int fromTimes, int toTimes);

27

Pattern<T, F> oneOrMore();

28

Pattern<T, F> timesOrMore(int times);

29

Pattern<T, F> optional();

30

Pattern<T, F> greedy();

31

}

32

```

33

34

### Simple Pattern Example

35

36

```java

37

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")

38

.where(new SimpleCondition<Event>() {

39

@Override

40

public boolean filter(Event event) {

41

return event.getType().equals("login");

42

}

43

})

44

.next("middle")

45

.where(new SimpleCondition<Event>() {

46

@Override

47

public boolean filter(Event event) {

48

return event.getType().equals("purchase");

49

}

50

})

51

.within(Time.minutes(10));

52

```

53

54

## Pattern Conditions

55

56

### Simple Conditions

57

58

```java { .api }

59

abstract class SimpleCondition<T> implements Function {

60

abstract boolean filter(T value) throws Exception;

61

}

62

```

63

64

**Usage:**

65

66

```java

67

// Simple condition for event type

68

SimpleCondition<Event> loginCondition = new SimpleCondition<Event>() {

69

@Override

70

public boolean filter(Event event) {

71

return "login".equals(event.getType());

72

}

73

};

74

75

// Using lambda

76

SimpleCondition<Event> highValueCondition = event -> event.getAmount() > 1000;

77

```

78

79

### Iterative Conditions

80

81

```java { .api }

82

abstract class IterativeCondition<T> extends RichFunction {

83

abstract boolean filter(T value, Context<T> ctx) throws Exception;

84

85

interface Context<T> {

86

Iterable<T> getEventsForPattern(String name);

87

<X> Iterable<X> getEventsForPattern(String name, Class<X> clazz);

88

long timestamp();

89

}

90

}

91

```

92

93

**Usage:**

94

95

```java

96

// Iterative condition accessing previous events

97

IterativeCondition<Event> increasingAmountCondition = new IterativeCondition<Event>() {

98

@Override

99

public boolean filter(Event current, Context<Event> ctx) throws Exception {

100

if (!current.getType().equals("purchase")) {

101

return false;

102

}

103

104

for (Event prev : ctx.getEventsForPattern("previous")) {

105

if (current.getAmount() <= prev.getAmount()) {

106

return false;

107

}

108

}

109

return true;

110

}

111

};

112

```

113

114

## Pattern Sequence Types

115

116

### Strict Contiguity (next)

117

118

```java

119

// Events must occur immediately one after another

120

Pattern<Event, ?> strictPattern = Pattern.<Event>begin("first")

121

.where(event -> event.getType().equals("A"))

122

.next("second")

123

.where(event -> event.getType().equals("B"));

124

```

125

126

### Relaxed Contiguity (followedBy)

127

128

```java

129

// Events can have other events in between

130

Pattern<Event, ?> relaxedPattern = Pattern.<Event>begin("first")

131

.where(event -> event.getType().equals("A"))

132

.followedBy("second")

133

.where(event -> event.getType().equals("B"));

134

```

135

136

### Non-Deterministic Relaxed Contiguity (followedByAny)

137

138

```java

139

// Multiple matches possible for the same event

140

Pattern<Event, ?> nonDetPattern = Pattern.<Event>begin("first")

141

.where(event -> event.getType().equals("A"))

142

.followedByAny("second")

143

.where(event -> event.getType().equals("B"));

144

```

145

146

## Quantifiers

147

148

### Times

149

150

```java

151

// Exactly 3 times

152

Pattern<Event, ?> exactPattern = Pattern.<Event>begin("events")

153

.where(event -> event.getType().equals("click"))

154

.times(3);

155

156

// Between 2 and 4 times

157

Pattern<Event, ?> rangePattern = Pattern.<Event>begin("events")

158

.where(event -> event.getType().equals("click"))

159

.times(2, 4);

160

```

161

162

### One or More

163

164

```java

165

// One or more occurrences

166

Pattern<Event, ?> oneOrMorePattern = Pattern.<Event>begin("events")

167

.where(event -> event.getType().equals("click"))

168

.oneOrMore();

169

170

// At least 2 occurrences

171

Pattern<Event, ?> timesOrMorePattern = Pattern.<Event>begin("events")

172

.where(event -> event.getType().equals("click"))

173

.timesOrMore(2);

174

```

175

176

### Optional

177

178

```java

179

// Optional event

180

Pattern<Event, ?> optionalPattern = Pattern.<Event>begin("start")

181

.where(event -> event.getType().equals("login"))

182

.followedBy("optional")

183

.where(event -> event.getType().equals("view"))

184

.optional()

185

.followedBy("end")

186

.where(event -> event.getType().equals("logout"));

187

```

188

189

## Negation Patterns

190

191

```java

192

// Not followed by

193

Pattern<Event, ?> notPattern = Pattern.<Event>begin("start")

194

.where(event -> event.getType().equals("login"))

195

.notFollowedBy("fraud")

196

.where(event -> event.getType().equals("suspicious"))

197

.followedBy("end")

198

.where(event -> event.getType().equals("purchase"));

199

200

// Not next

201

Pattern<Event, ?> notNextPattern = Pattern.<Event>begin("start")

202

.where(event -> event.getType().equals("start"))

203

.notNext()

204

.where(event -> event.getType().equals("error"));

205

```

206

207

## Time Constraints

208

209

```java

210

// Pattern must complete within time window

211

Pattern<Event, ?> timedPattern = Pattern.<Event>begin("start")

212

.where(event -> event.getType().equals("login"))

213

.followedBy("purchase")

214

.where(event -> event.getType().equals("purchase"))

215

.within(Time.minutes(30));

216

```

217

218

## Pattern Application

219

220

### CEP Pattern Stream

221

222

```java { .api }

223

class CEP {

224

static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern);

225

static <T> PatternStream<T> pattern(KeyedStream<T, ?> input, Pattern<T, ?> pattern);

226

}

227

228

interface PatternStream<T> {

229

<R> SingleOutputStreamOperator<R> select(PatternSelectFunction<T, R> patternSelectFunction);

230

<R> SingleOutputStreamOperator<R> process(PatternProcessFunction<T, R> patternProcessFunction);

231

<L, R> SingleOutputStreamOperator<R> select(OutputTag<L> timedOutPartialMatchesTag,

232

PatternTimeoutFunction<T, L> patternTimeoutFunction,

233

PatternSelectFunction<T, R> patternSelectFunction);

234

DataStream<T> inContext(String contextPattern);

235

}

236

```

237

238

**Usage:**

239

240

```java

241

DataStream<Event> eventStream = env.addSource(new EventSource());

242

243

Pattern<Event, ?> pattern = Pattern.<Event>begin("login")

244

.where(event -> event.getType().equals("login"))

245

.followedBy("purchase")

246

.where(event -> event.getType().equals("purchase"))

247

.within(Time.minutes(10));

248

249

PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);

250

```

251

252

## Pattern Selection

253

254

### Pattern Select Function

255

256

```java { .api }

257

interface PatternSelectFunction<IN, OUT> extends Function {

258

OUT select(Map<String, List<IN>> pattern) throws Exception;

259

}

260

```

261

262

**Usage:**

263

264

```java

265

DataStream<Alert> alerts = patternStream.select(

266

new PatternSelectFunction<Event, Alert>() {

267

@Override

268

public Alert select(Map<String, List<Event>> pattern) {

269

Event loginEvent = pattern.get("login").get(0);

270

Event purchaseEvent = pattern.get("purchase").get(0);

271

272

return new Alert(

273

loginEvent.getUserId(),

274

"Quick purchase after login",

275

loginEvent.getTimestamp(),

276

purchaseEvent.getTimestamp()

277

);

278

}

279

}

280

);

281

```

282

283

### Pattern Process Function

284

285

```java { .api }

286

abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {

287

abstract void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;

288

289

interface Context {

290

long timestamp();

291

<X> void output(OutputTag<X> outputTag, X value);

292

}

293

}

294

```

295

296

**Usage:**

297

298

```java

299

DataStream<Result> results = patternStream.process(

300

new PatternProcessFunction<Event, Result>() {

301

@Override

302

public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Result> out) {

303

List<Event> loginEvents = match.get("login");

304

List<Event> purchaseEvents = match.get("purchase");

305

306

// Process all combinations

307

for (Event login : loginEvents) {

308

for (Event purchase : purchaseEvents) {

309

out.collect(new Result(login, purchase, ctx.timestamp()));

310

}

311

}

312

}

313

}

314

);

315

```

316

317

## Timeout Handling

318

319

```java { .api }

320

interface PatternTimeoutFunction<IN, OUT> extends Function {

321

OUT timeout(Map<String, List<IN>> pattern, long timeoutTimestamp) throws Exception;

322

}

323

```

324

325

**Usage:**

326

327

```java

328

OutputTag<TimeoutAlert> timeoutTag = new OutputTag<TimeoutAlert>("timeout"){};

329

330

SingleOutputStreamOperator<Alert> result = patternStream.select(

331

timeoutTag,

332

new PatternTimeoutFunction<Event, TimeoutAlert>() {

333

@Override

334

public TimeoutAlert timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) {

335

Event loginEvent = pattern.get("login").get(0);

336

return new TimeoutAlert(loginEvent.getUserId(), "No purchase after login", timeoutTimestamp);

337

}

338

},

339

new PatternSelectFunction<Event, Alert>() {

340

@Override

341

public Alert select(Map<String, List<Event>> pattern) {

342

// Regular match processing

343

return new Alert(...);

344

}

345

}

346

);

347

348

DataStream<TimeoutAlert> timeouts = result.getSideOutput(timeoutTag);

349

```

350

351

## After Match Skip Strategies

352

353

```java { .api }

354

class AfterMatchSkipStrategy {

355

static AfterMatchSkipStrategy noSkip();

356

static AfterMatchSkipStrategy skipPastLastEvent();

357

static AfterMatchSkipStrategy skipToFirst(String patternName);

358

static AfterMatchSkipStrategy skipToLast(String patternName);

359

}

360

```

361

362

**Usage:**

363

364

```java

365

// Skip to the first event of "middle" pattern after a match

366

Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipToFirst("middle"))

367

.where(event -> event.getType().equals("A"))

368

.followedBy("middle")

369

.where(event -> event.getType().equals("B"))

370

.followedBy("end")

371

.where(event -> event.getType().equals("C"));

372

```

373

374

## Complex Pattern Examples

375

376

### Fraud Detection

377

378

```java

379

// Detect multiple failed login attempts followed by success

380

Pattern<LoginEvent, ?> fraudPattern = Pattern.<LoginEvent>begin("failed")

381

.where(event -> !event.isSuccessful())

382

.times(3).consecutive()

383

.followedBy("success")

384

.where(event -> event.isSuccessful())

385

.within(Time.minutes(5));

386

387

patternStream.select(new PatternSelectFunction<LoginEvent, FraudAlert>() {

388

@Override

389

public FraudAlert select(Map<String, List<LoginEvent>> pattern) {

390

List<LoginEvent> failures = pattern.get("failed");

391

LoginEvent success = pattern.get("success").get(0);

392

393

return new FraudAlert(

394

success.getUserId(),

395

failures.size(),

396

failures.get(0).getTimestamp(),

397

success.getTimestamp()

398

);

399

}

400

});

401

```

402

403

### User Journey Analysis

404

405

```java

406

// Track user journey: view -> add_to_cart -> (optional) remove -> purchase

407

Pattern<UserEvent, ?> journeyPattern = Pattern.<UserEvent>begin("view")

408

.where(event -> event.getAction().equals("view"))

409

.followedBy("cart")

410

.where(event -> event.getAction().equals("add_to_cart"))

411

.followedBy("remove")

412

.where(event -> event.getAction().equals("remove"))

413

.optional()

414

.followedBy("purchase")

415

.where(event -> event.getAction().equals("purchase"))

416

.within(Time.hours(24));

417

```

418

419

## Types

420

421

```java { .api }

422

interface PatternSelectFunction<IN, OUT> extends Function, Serializable;

423

interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable;

424

abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction;

425

426

class Time {

427

static Time milliseconds(long milliseconds);

428

static Time seconds(long seconds);

429

static Time minutes(long minutes);

430

static Time hours(long hours);

431

static Time days(long days);

432

}

433

434

enum Quantifier {

435

ONE,

436

ONE_OR_MORE,

437

TIMES,

438

LOOPING

439

}

440

441

interface Function extends Serializable;

442

abstract class RichFunction extends AbstractRichFunction implements Function;

443

```