Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
—
Specialized testing framework for session window functionality with event generation and validation. This framework provides comprehensive tools for testing session-based windowing operations and event processing patterns.
Factory classes and generators for creating session-based test events with configurable patterns and timing.
/**
* Factory for creating session event generators with different configurations
*/
public class EventGeneratorFactory {
/**
* Create session event generator with specified configuration
* @param config SessionConfiguration defining generator behavior
* @return SessionEventGenerator instance
*/
public static SessionEventGenerator create(SessionConfiguration config);
/**
* Create parallel session event generator for multi-session testing
* @param config SessionConfiguration for generator setup
* @param parallelism number of parallel session generators
* @return ParallelSessionsEventGenerator instance
*/
public static ParallelSessionsEventGenerator createParallel(
SessionConfiguration config,
int parallelism);
/**
* Create event generator with custom event factory
* @param eventFactory GeneratorEventFactory for event creation
* @param config SessionConfiguration for timing and behavior
* @return SessionEventGenerator with custom event creation
*/
public static SessionEventGenerator createWithCustomFactory(
GeneratorEventFactory eventFactory,
SessionConfiguration config);
}
/**
* Factory for creating generator events with configurable properties
*/
public class GeneratorEventFactory {
/**
* Constructor for generator event factory
* @param eventTypeConfig configuration for event types
*/
public GeneratorEventFactory(EventTypeConfiguration eventTypeConfig);
/**
* Create session event with specified properties
* @param sessionId identifier for the session
* @param timestamp event timestamp
* @param payload event payload data
* @return SessionEvent instance
*/
public SessionEvent createEvent(String sessionId, long timestamp, TestEventPayload payload);
/**
* Create batch of session events for testing
* @param sessionId session identifier
* @param eventCount number of events to create
* @param timeRange time range for event distribution
* @return List of SessionEvent objects
*/
public List<SessionEvent> createEventBatch(
String sessionId,
int eventCount,
TimeRange timeRange);
}Core interface for session event generation with flexible key and event type support.
/**
* Interface for generating session window events with configurable key and event types
* @param <K> key type for session events
* @param <E> event type for session data
*/
public interface EventGenerator<K, E> {
/**
* Generate event at specified global watermark
* @param globalWatermark current global watermark
* @return generated event, or null if no event should be generated
*/
E generateEvent(long globalWatermark);
/**
* Check if generator can produce an event at the specified watermark
* @param globalWatermark watermark to check against
* @return boolean indicating if event can be generated
*/
boolean canGenerateEventAtWatermark(long globalWatermark);
/**
* Check if generator has more events to produce
* @return boolean indicating if more events are available
*/
boolean hasMoreEvents();
/**
* Get local watermark for this generator
* @return long representing local watermark
*/
long getLocalWatermark();
/**
* Get next generator in sequence for chained generation
* @param globalWatermark current global watermark
* @return EventGenerator instance for next generation phase
*/
EventGenerator<K, E> getNextGenerator(long globalWatermark);
/**
* Get key associated with this generator's events
* @return K key for session grouping
*/
K getKey();
/**
* Reset generator to initial state
*/
void reset();
/**
* Get configuration used by this generator
* @return SessionConfiguration instance
*/
SessionConfiguration getConfiguration();
}Implementation classes for generating session events with different patterns and configurations.
/**
* Session event generator implementation with configurable event patterns
*/
public class SessionEventGeneratorImpl implements SessionEventGenerator {
/**
* Constructor for session event generator
* @param config SessionConfiguration for generator behavior
* @param eventFactory factory for creating events
*/
public SessionEventGeneratorImpl(
SessionConfiguration config,
GeneratorEventFactory eventFactory);
@Override
public Stream<SessionEvent> generateEvents();
@Override
public Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);
/**
* Generate events for specific session with controlled timing
* @param sessionId session identifier
* @param eventCount number of events to generate
* @param sessionDuration total duration of session
* @return Stream of SessionEvent objects
*/
public Stream<SessionEvent> generateSessionEvents(
String sessionId,
int eventCount,
Duration sessionDuration);
/**
* Generate overlapping session events for testing session boundaries
* @param sessionIds list of session identifiers
* @param overlapDuration duration of session overlap
* @return Stream of overlapping SessionEvent objects
*/
public Stream<SessionEvent> generateOverlappingSessions(
List<String> sessionIds,
Duration overlapDuration);
}
/**
* Parallel sessions event generator for testing concurrent session processing
*/
public class ParallelSessionsEventGenerator implements SessionEventGenerator {
/**
* Constructor for parallel sessions generator
* @param config SessionConfiguration for each parallel session
* @param sessionCount number of concurrent sessions
*/
public ParallelSessionsEventGenerator(SessionConfiguration config, int sessionCount);
@Override
public Stream<SessionEvent> generateEvents();
/**
* Generate events for multiple parallel sessions with different patterns
* @param sessionConfigs configurations for each parallel session
* @return Stream of SessionEvent objects from all parallel sessions
*/
public Stream<SessionEvent> generateParallelSessions(
List<SessionConfiguration> sessionConfigs);
/**
* Generate sessions with configurable arrival patterns
* @param arrivalPattern pattern for session arrival (UNIFORM, POISSON, BURST)
* @param sessionDuration duration of each session
* @return Stream of SessionEvent objects with specified arrival pattern
*/
public Stream<SessionEvent> generateSessionsWithArrivalPattern(
ArrivalPattern arrivalPattern,
Duration sessionDuration);
}
/**
* Interface for session event generators
*/
public interface SessionEventGenerator {
/**
* Generate stream of session events
* @return Stream of SessionEvent objects
*/
Stream<SessionEvent> generateEvents();
/**
* Generate events for specific time range
* @param range TimeRange for event generation
* @return Stream of SessionEvent objects within time range
*/
Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);
}Data structures representing session events and their associated metadata.
/**
* Session event data structure for window testing
*/
public class SessionEvent {
/**
* Constructor for session event
* @param sessionId identifier of the session
* @param timestamp event timestamp
* @param payload event payload data
*/
public SessionEvent(String sessionId, long timestamp, TestEventPayload payload);
/**
* Get session identifier
* @return String session ID
*/
public String getSessionId();
/**
* Get event timestamp
* @return long timestamp in milliseconds
*/
public long getTimestamp();
/**
* Get event payload
* @return TestEventPayload containing event data
*/
public TestEventPayload getPayload();
/**
* Check if event belongs to specified session
* @param sessionId session identifier to check
* @return boolean indicating session membership
*/
public boolean belongsToSession(String sessionId);
/**
* Calculate time difference from another session event
* @param other other SessionEvent to compare with
* @return long time difference in milliseconds
*/
public long getTimeDifferenceFrom(SessionEvent other);
}
/**
* Event payload for testing session window functionality
*/
public class TestEventPayload {
/**
* Constructor for test event payload
* @param data payload data as string
* @param eventType type of the event
* @param sequenceNumber sequence number within session
*/
public TestEventPayload(String data, String eventType, int sequenceNumber);
/**
* Get payload data
* @return String payload data
*/
public String getData();
/**
* Get event type
* @return String event type identifier
*/
public String getEventType();
/**
* Get sequence number within session
* @return int sequence number
*/
public int getSequenceNumber();
/**
* Get payload size in bytes
* @return int size of payload data
*/
public int getPayloadSize();
}Configuration classes for customizing session event generation and testing behavior.
/**
* Configuration for session event generation and testing
*/
public class SessionConfiguration {
/**
* Constructor for session configuration
* @param sessionTimeout timeout for session inactivity
* @param eventRate events per second generation rate
* @param sessionDuration maximum duration of sessions
*/
public SessionConfiguration(
Duration sessionTimeout,
double eventRate,
Duration sessionDuration);
/**
* Get session timeout duration
* @return Duration of session timeout
*/
public Duration getSessionTimeout();
/**
* Get event generation rate
* @return double events per second
*/
public double getEventRate();
/**
* Get maximum session duration
* @return Duration of maximum session length
*/
public Duration getSessionDuration();
/**
* Get session gap threshold for session boundary detection
* @return Duration threshold for session gaps
*/
public Duration getSessionGapThreshold();
/**
* Check if sessions should overlap for testing
* @return boolean indicating session overlap configuration
*/
public boolean isSessionOverlapEnabled();
/**
* Get parallelism level for session processing
* @return int parallelism level
*/
public int getParallelism();
}
/**
* Configuration for event generator behavior and patterns
*/
public class GeneratorConfiguration {
/**
* Constructor for generator configuration
* @param eventTypes types of events to generate
* @param distributionPattern distribution pattern for event timing
* @param seedValue random seed for reproducible generation
*/
public GeneratorConfiguration(
List<String> eventTypes,
DistributionPattern distributionPattern,
long seedValue);
/**
* Get configured event types
* @return List of String event type identifiers
*/
public List<String> getEventTypes();
/**
* Get event distribution pattern
* @return DistributionPattern for event timing
*/
public DistributionPattern getDistributionPattern();
/**
* Get random seed for reproducible generation
* @return long seed value
*/
public long getSeedValue();
/**
* Get payload size configuration
* @return PayloadSizeConfig for event payload sizing
*/
public PayloadSizeConfig getPayloadSizeConfig();
}
/**
* Time range specification for event generation
*/
public class TimeRange {
/**
* Constructor for time range
* @param startTime start timestamp
* @param endTime end timestamp
*/
public TimeRange(long startTime, long endTime);
/**
* Get start timestamp
* @return long start time in milliseconds
*/
public long getStartTime();
/**
* Get end timestamp
* @return long end time in milliseconds
*/
public long getEndTime();
/**
* Get duration of time range
* @return Duration of the time range
*/
public Duration getDuration();
/**
* Check if timestamp falls within range
* @param timestamp timestamp to check
* @return boolean indicating if timestamp is in range
*/
public boolean contains(long timestamp);
}
/**
* Event arrival patterns for session generation
*/
public enum ArrivalPattern {
/** Uniform arrival rate */
UNIFORM,
/** Poisson distributed arrivals */
POISSON,
/** Bursty arrival pattern */
BURST,
/** Custom configured pattern */
CUSTOM
}
/**
* Distribution patterns for event timing
*/
public enum DistributionPattern {
/** Regular intervals */
REGULAR,
/** Random intervals */
RANDOM,
/** Exponential distribution */
EXPONENTIAL,
/** Normal distribution */
NORMAL
}Utilities for validating session window behavior and results.
/**
* Validator for session window processing results
*/
public class SessionWindowValidator {
/**
* Constructor for session window validator
* @param expectedSessions expected session configurations
*/
public SessionWindowValidator(List<SessionConfiguration> expectedSessions);
/**
* Validate session window results against expected sessions
* @param windowResults results from session window processing
* @return boolean indicating validation success
*/
public boolean validateSessionWindows(List<WindowResult> windowResults);
/**
* Validate session boundaries and timing
* @param sessionEvents events grouped by session
* @param sessionTimeout configured session timeout
* @return boolean indicating correct session boundaries
*/
public boolean validateSessionBoundaries(
Map<String, List<SessionEvent>> sessionEvents,
Duration sessionTimeout);
/**
* Validate session completeness and event ordering
* @param processedSessions processed session results
* @return boolean indicating session completeness
*/
public boolean validateSessionCompleteness(List<ProcessedSession> processedSessions);
}
/**
* Result of session window processing
*/
public class WindowResult {
/**
* Constructor for window result
* @param sessionId session identifier
* @param startTime window start time
* @param endTime window end time
* @param eventCount number of events in window
*/
public WindowResult(String sessionId, long startTime, long endTime, int eventCount);
/**
* Get session identifier for this window
* @return String session ID
*/
public String getSessionId();
/**
* Get window start time
* @return long start timestamp
*/
public long getStartTime();
/**
* Get window end time
* @return long end timestamp
*/
public long getEndTime();
/**
* Get number of events processed in window
* @return int event count
*/
public int getEventCount();
/**
* Get window duration
* @return Duration of the window
*/
public Duration getWindowDuration();
}Usage Examples:
import org.apache.flink.test.windowing.sessionwindows.*;
// Basic session window testing
public class SessionWindowTest {
@Test
public void testBasicSessionWindows() throws Exception {
// Configure session parameters
SessionConfiguration config = new SessionConfiguration(
Duration.ofMinutes(5), // 5 minute session timeout
10.0, // 10 events per second
Duration.ofMinutes(30) // 30 minute max session duration
);
// Create event generator
SessionEventGenerator generator = EventGeneratorFactory.create(config);
// Generate test events
Stream<SessionEvent> events = generator.generateEvents();
// Create Flink job for session window processing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SessionEvent> eventStream = env.fromCollection(
events.limit(1000).collect(Collectors.toList()));
// Apply session windows
DataStream<WindowResult> windowResults = eventStream
.keyBy(SessionEvent::getSessionId)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.apply(new SessionWindowFunction());
// Validate results
List<WindowResult> results = DataStreamUtils.collect(windowResults);
SessionWindowValidator validator = new SessionWindowValidator(
Arrays.asList(config));
assertTrue(validator.validateSessionWindows(results));
}
@Test
public void testOverlappingSessions() throws Exception {
SessionConfiguration config = new SessionConfiguration(
Duration.ofMinutes(2), // Short timeout for overlap testing
5.0, // Moderate event rate
Duration.ofMinutes(10) // Session duration
);
SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(
config, new GeneratorEventFactory(createEventTypeConfig()));
// Generate overlapping sessions
List<String> sessionIds = Arrays.asList("session1", "session2", "session3");
Stream<SessionEvent> overlappingEvents = generator.generateOverlappingSessions(
sessionIds, Duration.ofMinutes(1));
// Process with session windows
List<SessionEvent> eventList = overlappingEvents
.limit(500)
.collect(Collectors.toList());
// Validate session boundaries
Map<String, List<SessionEvent>> eventsBySession = eventList.stream()
.collect(Collectors.groupingBy(SessionEvent::getSessionId));
SessionWindowValidator validator = new SessionWindowValidator(
Arrays.asList(config));
assertTrue(validator.validateSessionBoundaries(
eventsBySession, config.getSessionTimeout()));
}
@Test
public void testParallelSessionGeneration() throws Exception {
SessionConfiguration config = new SessionConfiguration(
Duration.ofMinutes(3),
8.0,
Duration.ofMinutes(15)
);
// Create parallel session generator
ParallelSessionsEventGenerator parallelGenerator =
EventGeneratorFactory.createParallel(config, 5);
// Generate events with different arrival patterns
Stream<SessionEvent> uniformEvents = parallelGenerator
.generateSessionsWithArrivalPattern(
ArrivalPattern.UNIFORM, Duration.ofMinutes(10));
Stream<SessionEvent> poissonEvents = parallelGenerator
.generateSessionsWithArrivalPattern(
ArrivalPattern.POISSON, Duration.ofMinutes(10));
// Combine and process both streams
List<SessionEvent> allEvents = Stream.concat(uniformEvents, poissonEvents)
.limit(2000)
.collect(Collectors.toList());
// Validate event distribution
assertFalse(allEvents.isEmpty());
assertTrue(allEvents.size() >= 1000); // Should have events from both patterns
}
}
// Advanced session window testing
public class AdvancedSessionWindowTest {
@Test
public void testCustomEventFactory() throws Exception {
// Create custom event type configuration
EventTypeConfiguration eventTypeConfig = new EventTypeConfiguration(
Arrays.asList("login", "purchase", "logout"),
Map.of("login", 0.4, "purchase", 0.4, "logout", 0.2)
);
GeneratorEventFactory customFactory = new GeneratorEventFactory(eventTypeConfig);
SessionConfiguration config = new SessionConfiguration(
Duration.ofMinutes(10),
15.0,
Duration.ofMinutes(45)
);
// Create generator with custom factory
SessionEventGenerator generator = EventGeneratorFactory
.createWithCustomFactory(customFactory, config);
// Generate events for specific time range
TimeRange testRange = new TimeRange(
System.currentTimeMillis(),
System.currentTimeMillis() + Duration.ofHours(1).toMillis()
);
Stream<SessionEvent> timeRangeEvents = generator
.generateEventsForTimeRange(testRange);
List<SessionEvent> events = timeRangeEvents
.limit(1000)
.collect(Collectors.toList());
// Validate events are within time range
assertTrue(events.stream().allMatch(event ->
testRange.contains(event.getTimestamp())));
// Validate event type distribution
Map<String, Long> eventTypeCounts = events.stream()
.collect(Collectors.groupingBy(
event -> event.getPayload().getEventType(),
Collectors.counting()));
assertTrue(eventTypeCounts.containsKey("login"));
assertTrue(eventTypeCounts.containsKey("purchase"));
assertTrue(eventTypeCounts.containsKey("logout"));
}
@Test
public void testSessionEventSequencing() throws Exception {
SessionConfiguration config = new SessionConfiguration(
Duration.ofMinutes(5),
12.0,
Duration.ofMinutes(20)
);
SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(
config, new GeneratorEventFactory(createEventTypeConfig()));
// Generate events for specific session
Stream<SessionEvent> sessionEvents = generator.generateSessionEvents(
"test-session-001", 100, Duration.ofMinutes(15));
List<SessionEvent> eventList = sessionEvents.collect(Collectors.toList());
// Validate event sequencing
for (int i = 0; i < eventList.size() - 1; i++) {
SessionEvent current = eventList.get(i);
SessionEvent next = eventList.get(i + 1);
// Events should be ordered by timestamp
assertTrue(current.getTimestamp() <= next.getTimestamp());
// All events should belong to same session
assertEquals("test-session-001", current.getSessionId());
assertEquals("test-session-001", next.getSessionId());
// Sequence numbers should be increasing
assertTrue(current.getPayload().getSequenceNumber() <=
next.getPayload().getSequenceNumber());
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests