or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

session-window-testing.mddocs/

0

# Session Window Testing Framework

1

2

Specialized testing framework for session window functionality with event generation and validation. This framework provides comprehensive tools for testing session-based windowing operations and event processing patterns.

3

4

## Capabilities

5

6

### Event Generator Framework

7

8

Factory classes and generators for creating session-based test events with configurable patterns and timing.

9

10

```java { .api }

11

/**

12

* Factory for creating session event generators with different configurations

13

*/

14

public class EventGeneratorFactory {

15

16

/**

17

* Create session event generator with specified configuration

18

* @param config SessionConfiguration defining generator behavior

19

* @return SessionEventGenerator instance

20

*/

21

public static SessionEventGenerator create(SessionConfiguration config);

22

23

/**

24

* Create parallel session event generator for multi-session testing

25

* @param config SessionConfiguration for generator setup

26

* @param parallelism number of parallel session generators

27

* @return ParallelSessionsEventGenerator instance

28

*/

29

public static ParallelSessionsEventGenerator createParallel(

30

SessionConfiguration config,

31

int parallelism);

32

33

/**

34

* Create event generator with custom event factory

35

* @param eventFactory GeneratorEventFactory for event creation

36

* @param config SessionConfiguration for timing and behavior

37

* @return SessionEventGenerator with custom event creation

38

*/

39

public static SessionEventGenerator createWithCustomFactory(

40

GeneratorEventFactory eventFactory,

41

SessionConfiguration config);

42

}

43

44

/**

45

* Factory for creating generator events with configurable properties

46

*/

47

public class GeneratorEventFactory {

48

49

/**

50

* Constructor for generator event factory

51

* @param eventTypeConfig configuration for event types

52

*/

53

public GeneratorEventFactory(EventTypeConfiguration eventTypeConfig);

54

55

/**

56

* Create session event with specified properties

57

* @param sessionId identifier for the session

58

* @param timestamp event timestamp

59

* @param payload event payload data

60

* @return SessionEvent instance

61

*/

62

public SessionEvent createEvent(String sessionId, long timestamp, TestEventPayload payload);

63

64

/**

65

* Create batch of session events for testing

66

* @param sessionId session identifier

67

* @param eventCount number of events to create

68

* @param timeRange time range for event distribution

69

* @return List of SessionEvent objects

70

*/

71

public List<SessionEvent> createEventBatch(

72

String sessionId,

73

int eventCount,

74

TimeRange timeRange);

75

}

76

```

77

78

### Event Generator Interface

79

80

Core interface for session event generation with flexible key and event type support.

81

82

```java { .api }

83

/**

84

* Interface for generating session window events with configurable key and event types

85

* @param <K> key type for session events

86

* @param <E> event type for session data

87

*/

88

public interface EventGenerator<K, E> {

89

90

/**

91

* Generate event at specified global watermark

92

* @param globalWatermark current global watermark

93

* @return generated event, or null if no event should be generated

94

*/

95

E generateEvent(long globalWatermark);

96

97

/**

98

* Check if generator can produce an event at the specified watermark

99

* @param globalWatermark watermark to check against

100

* @return boolean indicating if event can be generated

101

*/

102

boolean canGenerateEventAtWatermark(long globalWatermark);

103

104

/**

105

* Check if generator has more events to produce

106

* @return boolean indicating if more events are available

107

*/

108

boolean hasMoreEvents();

109

110

/**

111

* Get local watermark for this generator

112

* @return long representing local watermark

113

*/

114

long getLocalWatermark();

115

116

/**

117

* Get next generator in sequence for chained generation

118

* @param globalWatermark current global watermark

119

* @return EventGenerator instance for next generation phase

120

*/

121

EventGenerator<K, E> getNextGenerator(long globalWatermark);

122

123

/**

124

* Get key associated with this generator's events

125

* @return K key for session grouping

126

*/

127

K getKey();

128

129

/**

130

* Reset generator to initial state

131

*/

132

void reset();

133

134

/**

135

* Get configuration used by this generator

136

* @return SessionConfiguration instance

137

*/

138

SessionConfiguration getConfiguration();

139

}

140

```

141

142

### Session Event Generators

143

144

Implementation classes for generating session events with different patterns and configurations.

145

146

```java { .api }

147

/**

148

* Session event generator implementation with configurable event patterns

149

*/

150

public class SessionEventGeneratorImpl implements SessionEventGenerator {

151

152

/**

153

* Constructor for session event generator

154

* @param config SessionConfiguration for generator behavior

155

* @param eventFactory factory for creating events

156

*/

157

public SessionEventGeneratorImpl(

158

SessionConfiguration config,

159

GeneratorEventFactory eventFactory);

160

161

@Override

162

public Stream<SessionEvent> generateEvents();

163

164

@Override

165

public Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);

166

167

/**

168

* Generate events for specific session with controlled timing

169

* @param sessionId session identifier

170

* @param eventCount number of events to generate

171

* @param sessionDuration total duration of session

172

* @return Stream of SessionEvent objects

173

*/

174

public Stream<SessionEvent> generateSessionEvents(

175

String sessionId,

176

int eventCount,

177

Duration sessionDuration);

178

179

/**

180

* Generate overlapping session events for testing session boundaries

181

* @param sessionIds list of session identifiers

182

* @param overlapDuration duration of session overlap

183

* @return Stream of overlapping SessionEvent objects

184

*/

185

public Stream<SessionEvent> generateOverlappingSessions(

186

List<String> sessionIds,

187

Duration overlapDuration);

188

}

189

190

/**

191

* Parallel sessions event generator for testing concurrent session processing

192

*/

193

public class ParallelSessionsEventGenerator implements SessionEventGenerator {

194

195

/**

196

* Constructor for parallel sessions generator

197

* @param config SessionConfiguration for each parallel session

198

* @param sessionCount number of concurrent sessions

199

*/

200

public ParallelSessionsEventGenerator(SessionConfiguration config, int sessionCount);

201

202

@Override

203

public Stream<SessionEvent> generateEvents();

204

205

/**

206

* Generate events for multiple parallel sessions with different patterns

207

* @param sessionConfigs configurations for each parallel session

208

* @return Stream of SessionEvent objects from all parallel sessions

209

*/

210

public Stream<SessionEvent> generateParallelSessions(

211

List<SessionConfiguration> sessionConfigs);

212

213

/**

214

* Generate sessions with configurable arrival patterns

215

* @param arrivalPattern pattern for session arrival (UNIFORM, POISSON, BURST)

216

* @param sessionDuration duration of each session

217

* @return Stream of SessionEvent objects with specified arrival pattern

218

*/

219

public Stream<SessionEvent> generateSessionsWithArrivalPattern(

220

ArrivalPattern arrivalPattern,

221

Duration sessionDuration);

222

}

223

224

/**

225

* Interface for session event generators

226

*/

227

public interface SessionEventGenerator {

228

229

/**

230

* Generate stream of session events

231

* @return Stream of SessionEvent objects

232

*/

233

Stream<SessionEvent> generateEvents();

234

235

/**

236

* Generate events for specific time range

237

* @param range TimeRange for event generation

238

* @return Stream of SessionEvent objects within time range

239

*/

240

Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);

241

}

242

```

243

244

### Session Event Data Structures

245

246

Data structures representing session events and their associated metadata.

247

248

```java { .api }

249

/**

250

* Session event data structure for window testing

251

*/

252

public class SessionEvent {

253

254

/**

255

* Constructor for session event

256

* @param sessionId identifier of the session

257

* @param timestamp event timestamp

258

* @param payload event payload data

259

*/

260

public SessionEvent(String sessionId, long timestamp, TestEventPayload payload);

261

262

/**

263

* Get session identifier

264

* @return String session ID

265

*/

266

public String getSessionId();

267

268

/**

269

* Get event timestamp

270

* @return long timestamp in milliseconds

271

*/

272

public long getTimestamp();

273

274

/**

275

* Get event payload

276

* @return TestEventPayload containing event data

277

*/

278

public TestEventPayload getPayload();

279

280

/**

281

* Check if event belongs to specified session

282

* @param sessionId session identifier to check

283

* @return boolean indicating session membership

284

*/

285

public boolean belongsToSession(String sessionId);

286

287

/**

288

* Calculate time difference from another session event

289

* @param other other SessionEvent to compare with

290

* @return long time difference in milliseconds

291

*/

292

public long getTimeDifferenceFrom(SessionEvent other);

293

}

294

295

/**

296

* Event payload for testing session window functionality

297

*/

298

public class TestEventPayload {

299

300

/**

301

* Constructor for test event payload

302

* @param data payload data as string

303

* @param eventType type of the event

304

* @param sequenceNumber sequence number within session

305

*/

306

public TestEventPayload(String data, String eventType, int sequenceNumber);

307

308

/**

309

* Get payload data

310

* @return String payload data

311

*/

312

public String getData();

313

314

/**

315

* Get event type

316

* @return String event type identifier

317

*/

318

public String getEventType();

319

320

/**

321

* Get sequence number within session

322

* @return int sequence number

323

*/

324

public int getSequenceNumber();

325

326

/**

327

* Get payload size in bytes

328

* @return int size of payload data

329

*/

330

public int getPayloadSize();

331

}

332

```

333

334

### Configuration Classes

335

336

Configuration classes for customizing session event generation and testing behavior.

337

338

```java { .api }

339

/**

340

* Configuration for session event generation and testing

341

*/

342

public class SessionConfiguration {

343

344

/**

345

* Constructor for session configuration

346

* @param sessionTimeout timeout for session inactivity

347

* @param eventRate events per second generation rate

348

* @param sessionDuration maximum duration of sessions

349

*/

350

public SessionConfiguration(

351

Duration sessionTimeout,

352

double eventRate,

353

Duration sessionDuration);

354

355

/**

356

* Get session timeout duration

357

* @return Duration of session timeout

358

*/

359

public Duration getSessionTimeout();

360

361

/**

362

* Get event generation rate

363

* @return double events per second

364

*/

365

public double getEventRate();

366

367

/**

368

* Get maximum session duration

369

* @return Duration of maximum session length

370

*/

371

public Duration getSessionDuration();

372

373

/**

374

* Get session gap threshold for session boundary detection

375

* @return Duration threshold for session gaps

376

*/

377

public Duration getSessionGapThreshold();

378

379

/**

380

* Check if sessions should overlap for testing

381

* @return boolean indicating session overlap configuration

382

*/

383

public boolean isSessionOverlapEnabled();

384

385

/**

386

* Get parallelism level for session processing

387

* @return int parallelism level

388

*/

389

public int getParallelism();

390

}

391

392

/**

393

* Configuration for event generator behavior and patterns

394

*/

395

public class GeneratorConfiguration {

396

397

/**

398

* Constructor for generator configuration

399

* @param eventTypes types of events to generate

400

* @param distributionPattern distribution pattern for event timing

401

* @param seedValue random seed for reproducible generation

402

*/

403

public GeneratorConfiguration(

404

List<String> eventTypes,

405

DistributionPattern distributionPattern,

406

long seedValue);

407

408

/**

409

* Get configured event types

410

* @return List of String event type identifiers

411

*/

412

public List<String> getEventTypes();

413

414

/**

415

* Get event distribution pattern

416

* @return DistributionPattern for event timing

417

*/

418

public DistributionPattern getDistributionPattern();

419

420

/**

421

* Get random seed for reproducible generation

422

* @return long seed value

423

*/

424

public long getSeedValue();

425

426

/**

427

* Get payload size configuration

428

* @return PayloadSizeConfig for event payload sizing

429

*/

430

public PayloadSizeConfig getPayloadSizeConfig();

431

}

432

433

/**

434

* Time range specification for event generation

435

*/

436

public class TimeRange {

437

438

/**

439

* Constructor for time range

440

* @param startTime start timestamp

441

* @param endTime end timestamp

442

*/

443

public TimeRange(long startTime, long endTime);

444

445

/**

446

* Get start timestamp

447

* @return long start time in milliseconds

448

*/

449

public long getStartTime();

450

451

/**

452

* Get end timestamp

453

* @return long end time in milliseconds

454

*/

455

public long getEndTime();

456

457

/**

458

* Get duration of time range

459

* @return Duration of the time range

460

*/

461

public Duration getDuration();

462

463

/**

464

* Check if timestamp falls within range

465

* @param timestamp timestamp to check

466

* @return boolean indicating if timestamp is in range

467

*/

468

public boolean contains(long timestamp);

469

}

470

471

/**

472

* Event arrival patterns for session generation

473

*/

474

public enum ArrivalPattern {

475

/** Uniform arrival rate */

476

UNIFORM,

477

/** Poisson distributed arrivals */

478

POISSON,

479

/** Bursty arrival pattern */

480

BURST,

481

/** Custom configured pattern */

482

CUSTOM

483

}

484

485

/**

486

* Distribution patterns for event timing

487

*/

488

public enum DistributionPattern {

489

/** Regular intervals */

490

REGULAR,

491

/** Random intervals */

492

RANDOM,

493

/** Exponential distribution */

494

EXPONENTIAL,

495

/** Normal distribution */

496

NORMAL

497

}

498

```

499

500

### Session Window Validation

501

502

Utilities for validating session window behavior and results.

503

504

```java { .api }

505

/**

506

* Validator for session window processing results

507

*/

508

public class SessionWindowValidator {

509

510

/**

511

* Constructor for session window validator

512

* @param expectedSessions expected session configurations

513

*/

514

public SessionWindowValidator(List<SessionConfiguration> expectedSessions);

515

516

/**

517

* Validate session window results against expected sessions

518

* @param windowResults results from session window processing

519

* @return boolean indicating validation success

520

*/

521

public boolean validateSessionWindows(List<WindowResult> windowResults);

522

523

/**

524

* Validate session boundaries and timing

525

* @param sessionEvents events grouped by session

526

* @param sessionTimeout configured session timeout

527

* @return boolean indicating correct session boundaries

528

*/

529

public boolean validateSessionBoundaries(

530

Map<String, List<SessionEvent>> sessionEvents,

531

Duration sessionTimeout);

532

533

/**

534

* Validate session completeness and event ordering

535

* @param processedSessions processed session results

536

* @return boolean indicating session completeness

537

*/

538

public boolean validateSessionCompleteness(List<ProcessedSession> processedSessions);

539

}

540

541

/**

542

* Result of session window processing

543

*/

544

public class WindowResult {

545

546

/**

547

* Constructor for window result

548

* @param sessionId session identifier

549

* @param startTime window start time

550

* @param endTime window end time

551

* @param eventCount number of events in window

552

*/

553

public WindowResult(String sessionId, long startTime, long endTime, int eventCount);

554

555

/**

556

* Get session identifier for this window

557

* @return String session ID

558

*/

559

public String getSessionId();

560

561

/**

562

* Get window start time

563

* @return long start timestamp

564

*/

565

public long getStartTime();

566

567

/**

568

* Get window end time

569

* @return long end timestamp

570

*/

571

public long getEndTime();

572

573

/**

574

* Get number of events processed in window

575

* @return int event count

576

*/

577

public int getEventCount();

578

579

/**

580

* Get window duration

581

* @return Duration of the window

582

*/

583

public Duration getWindowDuration();

584

}

585

```

586

587

**Usage Examples:**

588

589

```java

590

import org.apache.flink.test.windowing.sessionwindows.*;

591

592

// Basic session window testing

593

public class SessionWindowTest {

594

595

@Test

596

public void testBasicSessionWindows() throws Exception {

597

// Configure session parameters

598

SessionConfiguration config = new SessionConfiguration(

599

Duration.ofMinutes(5), // 5 minute session timeout

600

10.0, // 10 events per second

601

Duration.ofMinutes(30) // 30 minute max session duration

602

);

603

604

// Create event generator

605

SessionEventGenerator generator = EventGeneratorFactory.create(config);

606

607

// Generate test events

608

Stream<SessionEvent> events = generator.generateEvents();

609

610

// Create Flink job for session window processing

611

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

612

613

DataStream<SessionEvent> eventStream = env.fromCollection(

614

events.limit(1000).collect(Collectors.toList()));

615

616

// Apply session windows

617

DataStream<WindowResult> windowResults = eventStream

618

.keyBy(SessionEvent::getSessionId)

619

.window(EventTimeSessionWindows.withGap(Time.minutes(5)))

620

.apply(new SessionWindowFunction());

621

622

// Validate results

623

List<WindowResult> results = DataStreamUtils.collect(windowResults);

624

625

SessionWindowValidator validator = new SessionWindowValidator(

626

Arrays.asList(config));

627

assertTrue(validator.validateSessionWindows(results));

628

}

629

630

@Test

631

public void testOverlappingSessions() throws Exception {

632

SessionConfiguration config = new SessionConfiguration(

633

Duration.ofMinutes(2), // Short timeout for overlap testing

634

5.0, // Moderate event rate

635

Duration.ofMinutes(10) // Session duration

636

);

637

638

SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(

639

config, new GeneratorEventFactory(createEventTypeConfig()));

640

641

// Generate overlapping sessions

642

List<String> sessionIds = Arrays.asList("session1", "session2", "session3");

643

Stream<SessionEvent> overlappingEvents = generator.generateOverlappingSessions(

644

sessionIds, Duration.ofMinutes(1));

645

646

// Process with session windows

647

List<SessionEvent> eventList = overlappingEvents

648

.limit(500)

649

.collect(Collectors.toList());

650

651

// Validate session boundaries

652

Map<String, List<SessionEvent>> eventsBySession = eventList.stream()

653

.collect(Collectors.groupingBy(SessionEvent::getSessionId));

654

655

SessionWindowValidator validator = new SessionWindowValidator(

656

Arrays.asList(config));

657

assertTrue(validator.validateSessionBoundaries(

658

eventsBySession, config.getSessionTimeout()));

659

}

660

661

@Test

662

public void testParallelSessionGeneration() throws Exception {

663

SessionConfiguration config = new SessionConfiguration(

664

Duration.ofMinutes(3),

665

8.0,

666

Duration.ofMinutes(15)

667

);

668

669

// Create parallel session generator

670

ParallelSessionsEventGenerator parallelGenerator =

671

EventGeneratorFactory.createParallel(config, 5);

672

673

// Generate events with different arrival patterns

674

Stream<SessionEvent> uniformEvents = parallelGenerator

675

.generateSessionsWithArrivalPattern(

676

ArrivalPattern.UNIFORM, Duration.ofMinutes(10));

677

678

Stream<SessionEvent> poissonEvents = parallelGenerator

679

.generateSessionsWithArrivalPattern(

680

ArrivalPattern.POISSON, Duration.ofMinutes(10));

681

682

// Combine and process both streams

683

List<SessionEvent> allEvents = Stream.concat(uniformEvents, poissonEvents)

684

.limit(2000)

685

.collect(Collectors.toList());

686

687

// Validate event distribution

688

assertFalse(allEvents.isEmpty());

689

assertTrue(allEvents.size() >= 1000); // Should have events from both patterns

690

}

691

}

692

693

// Advanced session window testing

694

public class AdvancedSessionWindowTest {

695

696

@Test

697

public void testCustomEventFactory() throws Exception {

698

// Create custom event type configuration

699

EventTypeConfiguration eventTypeConfig = new EventTypeConfiguration(

700

Arrays.asList("login", "purchase", "logout"),

701

Map.of("login", 0.4, "purchase", 0.4, "logout", 0.2)

702

);

703

704

GeneratorEventFactory customFactory = new GeneratorEventFactory(eventTypeConfig);

705

706

SessionConfiguration config = new SessionConfiguration(

707

Duration.ofMinutes(10),

708

15.0,

709

Duration.ofMinutes(45)

710

);

711

712

// Create generator with custom factory

713

SessionEventGenerator generator = EventGeneratorFactory

714

.createWithCustomFactory(customFactory, config);

715

716

// Generate events for specific time range

717

TimeRange testRange = new TimeRange(

718

System.currentTimeMillis(),

719

System.currentTimeMillis() + Duration.ofHours(1).toMillis()

720

);

721

722

Stream<SessionEvent> timeRangeEvents = generator

723

.generateEventsForTimeRange(testRange);

724

725

List<SessionEvent> events = timeRangeEvents

726

.limit(1000)

727

.collect(Collectors.toList());

728

729

// Validate events are within time range

730

assertTrue(events.stream().allMatch(event ->

731

testRange.contains(event.getTimestamp())));

732

733

// Validate event type distribution

734

Map<String, Long> eventTypeCounts = events.stream()

735

.collect(Collectors.groupingBy(

736

event -> event.getPayload().getEventType(),

737

Collectors.counting()));

738

739

assertTrue(eventTypeCounts.containsKey("login"));

740

assertTrue(eventTypeCounts.containsKey("purchase"));

741

assertTrue(eventTypeCounts.containsKey("logout"));

742

}

743

744

@Test

745

public void testSessionEventSequencing() throws Exception {

746

SessionConfiguration config = new SessionConfiguration(

747

Duration.ofMinutes(5),

748

12.0,

749

Duration.ofMinutes(20)

750

);

751

752

SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(

753

config, new GeneratorEventFactory(createEventTypeConfig()));

754

755

// Generate events for specific session

756

Stream<SessionEvent> sessionEvents = generator.generateSessionEvents(

757

"test-session-001", 100, Duration.ofMinutes(15));

758

759

List<SessionEvent> eventList = sessionEvents.collect(Collectors.toList());

760

761

// Validate event sequencing

762

for (int i = 0; i < eventList.size() - 1; i++) {

763

SessionEvent current = eventList.get(i);

764

SessionEvent next = eventList.get(i + 1);

765

766

// Events should be ordered by timestamp

767

assertTrue(current.getTimestamp() <= next.getTimestamp());

768

769

// All events should belong to same session

770

assertEquals("test-session-001", current.getSessionId());

771

assertEquals("test-session-001", next.getSessionId());

772

773

// Sequence numbers should be increasing

774

assertTrue(current.getPayload().getSequenceNumber() <=

775

next.getPayload().getSequenceNumber());

776

}

777

}

778

}

779

```