0
# Session Window Testing Framework
1
2
Specialized testing framework for session window functionality with event generation and validation. This framework provides comprehensive tools for testing session-based windowing operations and event processing patterns.
3
4
## Capabilities
5
6
### Event Generator Framework
7
8
Factory classes and generators for creating session-based test events with configurable patterns and timing.
9
10
```java { .api }
11
/**
12
* Factory for creating session event generators with different configurations
13
*/
14
public class EventGeneratorFactory {
15
16
/**
17
* Create session event generator with specified configuration
18
* @param config SessionConfiguration defining generator behavior
19
* @return SessionEventGenerator instance
20
*/
21
public static SessionEventGenerator create(SessionConfiguration config);
22
23
/**
24
* Create parallel session event generator for multi-session testing
25
* @param config SessionConfiguration for generator setup
26
* @param parallelism number of parallel session generators
27
* @return ParallelSessionsEventGenerator instance
28
*/
29
public static ParallelSessionsEventGenerator createParallel(
30
SessionConfiguration config,
31
int parallelism);
32
33
/**
34
* Create event generator with custom event factory
35
* @param eventFactory GeneratorEventFactory for event creation
36
* @param config SessionConfiguration for timing and behavior
37
* @return SessionEventGenerator with custom event creation
38
*/
39
public static SessionEventGenerator createWithCustomFactory(
40
GeneratorEventFactory eventFactory,
41
SessionConfiguration config);
42
}
43
44
/**
45
* Factory for creating generator events with configurable properties
46
*/
47
public class GeneratorEventFactory {
48
49
/**
50
* Constructor for generator event factory
51
* @param eventTypeConfig configuration for event types
52
*/
53
public GeneratorEventFactory(EventTypeConfiguration eventTypeConfig);
54
55
/**
56
* Create session event with specified properties
57
* @param sessionId identifier for the session
58
* @param timestamp event timestamp
59
* @param payload event payload data
60
* @return SessionEvent instance
61
*/
62
public SessionEvent createEvent(String sessionId, long timestamp, TestEventPayload payload);
63
64
/**
65
* Create batch of session events for testing
66
* @param sessionId session identifier
67
* @param eventCount number of events to create
68
* @param timeRange time range for event distribution
69
* @return List of SessionEvent objects
70
*/
71
public List<SessionEvent> createEventBatch(
72
String sessionId,
73
int eventCount,
74
TimeRange timeRange);
75
}
76
```
77
78
### Event Generator Interface
79
80
Core interface for session event generation with flexible key and event type support.
81
82
```java { .api }
83
/**
84
* Interface for generating session window events with configurable key and event types
85
* @param <K> key type for session events
86
* @param <E> event type for session data
87
*/
88
public interface EventGenerator<K, E> {
89
90
/**
91
* Generate event at specified global watermark
92
* @param globalWatermark current global watermark
93
* @return generated event, or null if no event should be generated
94
*/
95
E generateEvent(long globalWatermark);
96
97
/**
98
* Check if generator can produce an event at the specified watermark
99
* @param globalWatermark watermark to check against
100
* @return boolean indicating if event can be generated
101
*/
102
boolean canGenerateEventAtWatermark(long globalWatermark);
103
104
/**
105
* Check if generator has more events to produce
106
* @return boolean indicating if more events are available
107
*/
108
boolean hasMoreEvents();
109
110
/**
111
* Get local watermark for this generator
112
* @return long representing local watermark
113
*/
114
long getLocalWatermark();
115
116
/**
117
* Get next generator in sequence for chained generation
118
* @param globalWatermark current global watermark
119
* @return EventGenerator instance for next generation phase
120
*/
121
EventGenerator<K, E> getNextGenerator(long globalWatermark);
122
123
/**
124
* Get key associated with this generator's events
125
* @return K key for session grouping
126
*/
127
K getKey();
128
129
/**
130
* Reset generator to initial state
131
*/
132
void reset();
133
134
/**
135
* Get configuration used by this generator
136
* @return SessionConfiguration instance
137
*/
138
SessionConfiguration getConfiguration();
139
}
140
```
141
142
### Session Event Generators
143
144
Implementation classes for generating session events with different patterns and configurations.
145
146
```java { .api }
147
/**
148
* Session event generator implementation with configurable event patterns
149
*/
150
public class SessionEventGeneratorImpl implements SessionEventGenerator {
151
152
/**
153
* Constructor for session event generator
154
* @param config SessionConfiguration for generator behavior
155
* @param eventFactory factory for creating events
156
*/
157
public SessionEventGeneratorImpl(
158
SessionConfiguration config,
159
GeneratorEventFactory eventFactory);
160
161
@Override
162
public Stream<SessionEvent> generateEvents();
163
164
@Override
165
public Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);
166
167
/**
168
* Generate events for specific session with controlled timing
169
* @param sessionId session identifier
170
* @param eventCount number of events to generate
171
* @param sessionDuration total duration of session
172
* @return Stream of SessionEvent objects
173
*/
174
public Stream<SessionEvent> generateSessionEvents(
175
String sessionId,
176
int eventCount,
177
Duration sessionDuration);
178
179
/**
180
* Generate overlapping session events for testing session boundaries
181
* @param sessionIds list of session identifiers
182
* @param overlapDuration duration of session overlap
183
* @return Stream of overlapping SessionEvent objects
184
*/
185
public Stream<SessionEvent> generateOverlappingSessions(
186
List<String> sessionIds,
187
Duration overlapDuration);
188
}
189
190
/**
191
* Parallel sessions event generator for testing concurrent session processing
192
*/
193
public class ParallelSessionsEventGenerator implements SessionEventGenerator {
194
195
/**
196
* Constructor for parallel sessions generator
197
* @param config SessionConfiguration for each parallel session
198
* @param sessionCount number of concurrent sessions
199
*/
200
public ParallelSessionsEventGenerator(SessionConfiguration config, int sessionCount);
201
202
@Override
203
public Stream<SessionEvent> generateEvents();
204
205
/**
206
* Generate events for multiple parallel sessions with different patterns
207
* @param sessionConfigs configurations for each parallel session
208
* @return Stream of SessionEvent objects from all parallel sessions
209
*/
210
public Stream<SessionEvent> generateParallelSessions(
211
List<SessionConfiguration> sessionConfigs);
212
213
/**
214
* Generate sessions with configurable arrival patterns
215
* @param arrivalPattern pattern for session arrival (UNIFORM, POISSON, BURST)
216
* @param sessionDuration duration of each session
217
* @return Stream of SessionEvent objects with specified arrival pattern
218
*/
219
public Stream<SessionEvent> generateSessionsWithArrivalPattern(
220
ArrivalPattern arrivalPattern,
221
Duration sessionDuration);
222
}
223
224
/**
225
* Interface for session event generators
226
*/
227
public interface SessionEventGenerator {
228
229
/**
230
* Generate stream of session events
231
* @return Stream of SessionEvent objects
232
*/
233
Stream<SessionEvent> generateEvents();
234
235
/**
236
* Generate events for specific time range
237
* @param range TimeRange for event generation
238
* @return Stream of SessionEvent objects within time range
239
*/
240
Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);
241
}
242
```
243
244
### Session Event Data Structures
245
246
Data structures representing session events and their associated metadata.
247
248
```java { .api }
249
/**
250
* Session event data structure for window testing
251
*/
252
public class SessionEvent {
253
254
/**
255
* Constructor for session event
256
* @param sessionId identifier of the session
257
* @param timestamp event timestamp
258
* @param payload event payload data
259
*/
260
public SessionEvent(String sessionId, long timestamp, TestEventPayload payload);
261
262
/**
263
* Get session identifier
264
* @return String session ID
265
*/
266
public String getSessionId();
267
268
/**
269
* Get event timestamp
270
* @return long timestamp in milliseconds
271
*/
272
public long getTimestamp();
273
274
/**
275
* Get event payload
276
* @return TestEventPayload containing event data
277
*/
278
public TestEventPayload getPayload();
279
280
/**
281
* Check if event belongs to specified session
282
* @param sessionId session identifier to check
283
* @return boolean indicating session membership
284
*/
285
public boolean belongsToSession(String sessionId);
286
287
/**
288
* Calculate time difference from another session event
289
* @param other other SessionEvent to compare with
290
* @return long time difference in milliseconds
291
*/
292
public long getTimeDifferenceFrom(SessionEvent other);
293
}
294
295
/**
296
* Event payload for testing session window functionality
297
*/
298
public class TestEventPayload {
299
300
/**
301
* Constructor for test event payload
302
* @param data payload data as string
303
* @param eventType type of the event
304
* @param sequenceNumber sequence number within session
305
*/
306
public TestEventPayload(String data, String eventType, int sequenceNumber);
307
308
/**
309
* Get payload data
310
* @return String payload data
311
*/
312
public String getData();
313
314
/**
315
* Get event type
316
* @return String event type identifier
317
*/
318
public String getEventType();
319
320
/**
321
* Get sequence number within session
322
* @return int sequence number
323
*/
324
public int getSequenceNumber();
325
326
/**
327
* Get payload size in bytes
328
* @return int size of payload data
329
*/
330
public int getPayloadSize();
331
}
332
```
333
334
### Configuration Classes
335
336
Configuration classes for customizing session event generation and testing behavior.
337
338
```java { .api }
339
/**
340
* Configuration for session event generation and testing
341
*/
342
public class SessionConfiguration {
343
344
/**
345
* Constructor for session configuration
346
* @param sessionTimeout timeout for session inactivity
347
* @param eventRate events per second generation rate
348
* @param sessionDuration maximum duration of sessions
349
*/
350
public SessionConfiguration(
351
Duration sessionTimeout,
352
double eventRate,
353
Duration sessionDuration);
354
355
/**
356
* Get session timeout duration
357
* @return Duration of session timeout
358
*/
359
public Duration getSessionTimeout();
360
361
/**
362
* Get event generation rate
363
* @return double events per second
364
*/
365
public double getEventRate();
366
367
/**
368
* Get maximum session duration
369
* @return Duration of maximum session length
370
*/
371
public Duration getSessionDuration();
372
373
/**
374
* Get session gap threshold for session boundary detection
375
* @return Duration threshold for session gaps
376
*/
377
public Duration getSessionGapThreshold();
378
379
/**
380
* Check if sessions should overlap for testing
381
* @return boolean indicating session overlap configuration
382
*/
383
public boolean isSessionOverlapEnabled();
384
385
/**
386
* Get parallelism level for session processing
387
* @return int parallelism level
388
*/
389
public int getParallelism();
390
}
391
392
/**
393
* Configuration for event generator behavior and patterns
394
*/
395
public class GeneratorConfiguration {
396
397
/**
398
* Constructor for generator configuration
399
* @param eventTypes types of events to generate
400
* @param distributionPattern distribution pattern for event timing
401
* @param seedValue random seed for reproducible generation
402
*/
403
public GeneratorConfiguration(
404
List<String> eventTypes,
405
DistributionPattern distributionPattern,
406
long seedValue);
407
408
/**
409
* Get configured event types
410
* @return List of String event type identifiers
411
*/
412
public List<String> getEventTypes();
413
414
/**
415
* Get event distribution pattern
416
* @return DistributionPattern for event timing
417
*/
418
public DistributionPattern getDistributionPattern();
419
420
/**
421
* Get random seed for reproducible generation
422
* @return long seed value
423
*/
424
public long getSeedValue();
425
426
/**
427
* Get payload size configuration
428
* @return PayloadSizeConfig for event payload sizing
429
*/
430
public PayloadSizeConfig getPayloadSizeConfig();
431
}
432
433
/**
434
* Time range specification for event generation
435
*/
436
public class TimeRange {
437
438
/**
439
* Constructor for time range
440
* @param startTime start timestamp
441
* @param endTime end timestamp
442
*/
443
public TimeRange(long startTime, long endTime);
444
445
/**
446
* Get start timestamp
447
* @return long start time in milliseconds
448
*/
449
public long getStartTime();
450
451
/**
452
* Get end timestamp
453
* @return long end time in milliseconds
454
*/
455
public long getEndTime();
456
457
/**
458
* Get duration of time range
459
* @return Duration of the time range
460
*/
461
public Duration getDuration();
462
463
/**
464
* Check if timestamp falls within range
465
* @param timestamp timestamp to check
466
* @return boolean indicating if timestamp is in range
467
*/
468
public boolean contains(long timestamp);
469
}
470
471
/**
472
* Event arrival patterns for session generation
473
*/
474
public enum ArrivalPattern {
475
/** Uniform arrival rate */
476
UNIFORM,
477
/** Poisson distributed arrivals */
478
POISSON,
479
/** Bursty arrival pattern */
480
BURST,
481
/** Custom configured pattern */
482
CUSTOM
483
}
484
485
/**
486
* Distribution patterns for event timing
487
*/
488
public enum DistributionPattern {
489
/** Regular intervals */
490
REGULAR,
491
/** Random intervals */
492
RANDOM,
493
/** Exponential distribution */
494
EXPONENTIAL,
495
/** Normal distribution */
496
NORMAL
497
}
498
```
499
500
### Session Window Validation
501
502
Utilities for validating session window behavior and results.
503
504
```java { .api }
505
/**
506
* Validator for session window processing results
507
*/
508
public class SessionWindowValidator {
509
510
/**
511
* Constructor for session window validator
512
* @param expectedSessions expected session configurations
513
*/
514
public SessionWindowValidator(List<SessionConfiguration> expectedSessions);
515
516
/**
517
* Validate session window results against expected sessions
518
* @param windowResults results from session window processing
519
* @return boolean indicating validation success
520
*/
521
public boolean validateSessionWindows(List<WindowResult> windowResults);
522
523
/**
524
* Validate session boundaries and timing
525
* @param sessionEvents events grouped by session
526
* @param sessionTimeout configured session timeout
527
* @return boolean indicating correct session boundaries
528
*/
529
public boolean validateSessionBoundaries(
530
Map<String, List<SessionEvent>> sessionEvents,
531
Duration sessionTimeout);
532
533
/**
534
* Validate session completeness and event ordering
535
* @param processedSessions processed session results
536
* @return boolean indicating session completeness
537
*/
538
public boolean validateSessionCompleteness(List<ProcessedSession> processedSessions);
539
}
540
541
/**
542
* Result of session window processing
543
*/
544
public class WindowResult {
545
546
/**
547
* Constructor for window result
548
* @param sessionId session identifier
549
* @param startTime window start time
550
* @param endTime window end time
551
* @param eventCount number of events in window
552
*/
553
public WindowResult(String sessionId, long startTime, long endTime, int eventCount);
554
555
/**
556
* Get session identifier for this window
557
* @return String session ID
558
*/
559
public String getSessionId();
560
561
/**
562
* Get window start time
563
* @return long start timestamp
564
*/
565
public long getStartTime();
566
567
/**
568
* Get window end time
569
* @return long end timestamp
570
*/
571
public long getEndTime();
572
573
/**
574
* Get number of events processed in window
575
* @return int event count
576
*/
577
public int getEventCount();
578
579
/**
580
* Get window duration
581
* @return Duration of the window
582
*/
583
public Duration getWindowDuration();
584
}
585
```
586
587
**Usage Examples:**
588
589
```java
590
import org.apache.flink.test.windowing.sessionwindows.*;
591
592
// Basic session window testing
593
public class SessionWindowTest {
594
595
@Test
596
public void testBasicSessionWindows() throws Exception {
597
// Configure session parameters
598
SessionConfiguration config = new SessionConfiguration(
599
Duration.ofMinutes(5), // 5 minute session timeout
600
10.0, // 10 events per second
601
Duration.ofMinutes(30) // 30 minute max session duration
602
);
603
604
// Create event generator
605
SessionEventGenerator generator = EventGeneratorFactory.create(config);
606
607
// Generate test events
608
Stream<SessionEvent> events = generator.generateEvents();
609
610
// Create Flink job for session window processing
611
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
612
613
DataStream<SessionEvent> eventStream = env.fromCollection(
614
events.limit(1000).collect(Collectors.toList()));
615
616
// Apply session windows
617
DataStream<WindowResult> windowResults = eventStream
618
.keyBy(SessionEvent::getSessionId)
619
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
620
.apply(new SessionWindowFunction());
621
622
// Validate results
623
List<WindowResult> results = DataStreamUtils.collect(windowResults);
624
625
SessionWindowValidator validator = new SessionWindowValidator(
626
Arrays.asList(config));
627
assertTrue(validator.validateSessionWindows(results));
628
}
629
630
@Test
631
public void testOverlappingSessions() throws Exception {
632
SessionConfiguration config = new SessionConfiguration(
633
Duration.ofMinutes(2), // Short timeout for overlap testing
634
5.0, // Moderate event rate
635
Duration.ofMinutes(10) // Session duration
636
);
637
638
SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(
639
config, new GeneratorEventFactory(createEventTypeConfig()));
640
641
// Generate overlapping sessions
642
List<String> sessionIds = Arrays.asList("session1", "session2", "session3");
643
Stream<SessionEvent> overlappingEvents = generator.generateOverlappingSessions(
644
sessionIds, Duration.ofMinutes(1));
645
646
// Process with session windows
647
List<SessionEvent> eventList = overlappingEvents
648
.limit(500)
649
.collect(Collectors.toList());
650
651
// Validate session boundaries
652
Map<String, List<SessionEvent>> eventsBySession = eventList.stream()
653
.collect(Collectors.groupingBy(SessionEvent::getSessionId));
654
655
SessionWindowValidator validator = new SessionWindowValidator(
656
Arrays.asList(config));
657
assertTrue(validator.validateSessionBoundaries(
658
eventsBySession, config.getSessionTimeout()));
659
}
660
661
@Test
662
public void testParallelSessionGeneration() throws Exception {
663
SessionConfiguration config = new SessionConfiguration(
664
Duration.ofMinutes(3),
665
8.0,
666
Duration.ofMinutes(15)
667
);
668
669
// Create parallel session generator
670
ParallelSessionsEventGenerator parallelGenerator =
671
EventGeneratorFactory.createParallel(config, 5);
672
673
// Generate events with different arrival patterns
674
Stream<SessionEvent> uniformEvents = parallelGenerator
675
.generateSessionsWithArrivalPattern(
676
ArrivalPattern.UNIFORM, Duration.ofMinutes(10));
677
678
Stream<SessionEvent> poissonEvents = parallelGenerator
679
.generateSessionsWithArrivalPattern(
680
ArrivalPattern.POISSON, Duration.ofMinutes(10));
681
682
// Combine and process both streams
683
List<SessionEvent> allEvents = Stream.concat(uniformEvents, poissonEvents)
684
.limit(2000)
685
.collect(Collectors.toList());
686
687
// Validate event distribution
688
assertFalse(allEvents.isEmpty());
689
assertTrue(allEvents.size() >= 1000); // Should have events from both patterns
690
}
691
}
692
693
// Advanced session window testing
694
public class AdvancedSessionWindowTest {
695
696
@Test
697
public void testCustomEventFactory() throws Exception {
698
// Create custom event type configuration
699
EventTypeConfiguration eventTypeConfig = new EventTypeConfiguration(
700
Arrays.asList("login", "purchase", "logout"),
701
Map.of("login", 0.4, "purchase", 0.4, "logout", 0.2)
702
);
703
704
GeneratorEventFactory customFactory = new GeneratorEventFactory(eventTypeConfig);
705
706
SessionConfiguration config = new SessionConfiguration(
707
Duration.ofMinutes(10),
708
15.0,
709
Duration.ofMinutes(45)
710
);
711
712
// Create generator with custom factory
713
SessionEventGenerator generator = EventGeneratorFactory
714
.createWithCustomFactory(customFactory, config);
715
716
// Generate events for specific time range
717
TimeRange testRange = new TimeRange(
718
System.currentTimeMillis(),
719
System.currentTimeMillis() + Duration.ofHours(1).toMillis()
720
);
721
722
Stream<SessionEvent> timeRangeEvents = generator
723
.generateEventsForTimeRange(testRange);
724
725
List<SessionEvent> events = timeRangeEvents
726
.limit(1000)
727
.collect(Collectors.toList());
728
729
// Validate events are within time range
730
assertTrue(events.stream().allMatch(event ->
731
testRange.contains(event.getTimestamp())));
732
733
// Validate event type distribution
734
Map<String, Long> eventTypeCounts = events.stream()
735
.collect(Collectors.groupingBy(
736
event -> event.getPayload().getEventType(),
737
Collectors.counting()));
738
739
assertTrue(eventTypeCounts.containsKey("login"));
740
assertTrue(eventTypeCounts.containsKey("purchase"));
741
assertTrue(eventTypeCounts.containsKey("logout"));
742
}
743
744
@Test
745
public void testSessionEventSequencing() throws Exception {
746
SessionConfiguration config = new SessionConfiguration(
747
Duration.ofMinutes(5),
748
12.0,
749
Duration.ofMinutes(20)
750
);
751
752
SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(
753
config, new GeneratorEventFactory(createEventTypeConfig()));
754
755
// Generate events for specific session
756
Stream<SessionEvent> sessionEvents = generator.generateSessionEvents(
757
"test-session-001", 100, Duration.ofMinutes(15));
758
759
List<SessionEvent> eventList = sessionEvents.collect(Collectors.toList());
760
761
// Validate event sequencing
762
for (int i = 0; i < eventList.size() - 1; i++) {
763
SessionEvent current = eventList.get(i);
764
SessionEvent next = eventList.get(i + 1);
765
766
// Events should be ordered by timestamp
767
assertTrue(current.getTimestamp() <= next.getTimestamp());
768
769
// All events should belong to same session
770
assertEquals("test-session-001", current.getSessionId());
771
assertEquals("test-session-001", next.getSessionId());
772
773
// Sequence numbers should be increasing
774
assertTrue(current.getPayload().getSequenceNumber() <=
775
next.getPayload().getSequenceNumber());
776
}
777
}
778
}
779
```