CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tests

Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.

Pending
Overview
Eval results
Files

session-window-testing.mddocs/

Session Window Testing Framework

Specialized testing framework for session window functionality with event generation and validation. This framework provides comprehensive tools for testing session-based windowing operations and event processing patterns.

Capabilities

Event Generator Framework

Factory classes and generators for creating session-based test events with configurable patterns and timing.

/**
 * Factory for creating session event generators with different configurations
 */
public class EventGeneratorFactory {
    
    /**
     * Create session event generator with specified configuration
     * @param config SessionConfiguration defining generator behavior
     * @return SessionEventGenerator instance
     */
    public static SessionEventGenerator create(SessionConfiguration config);
    
    /**
     * Create parallel session event generator for multi-session testing
     * @param config SessionConfiguration for generator setup
     * @param parallelism number of parallel session generators
     * @return ParallelSessionsEventGenerator instance
     */
    public static ParallelSessionsEventGenerator createParallel(
        SessionConfiguration config, 
        int parallelism);
    
    /**
     * Create event generator with custom event factory
     * @param eventFactory GeneratorEventFactory for event creation
     * @param config SessionConfiguration for timing and behavior
     * @return SessionEventGenerator with custom event creation
     */
    public static SessionEventGenerator createWithCustomFactory(
        GeneratorEventFactory eventFactory,
        SessionConfiguration config);
}

/**
 * Factory for creating generator events with configurable properties
 */
public class GeneratorEventFactory {
    
    /**
     * Constructor for generator event factory
     * @param eventTypeConfig configuration for event types
     */
    public GeneratorEventFactory(EventTypeConfiguration eventTypeConfig);
    
    /**
     * Create session event with specified properties
     * @param sessionId identifier for the session
     * @param timestamp event timestamp
     * @param payload event payload data
     * @return SessionEvent instance
     */
    public SessionEvent createEvent(String sessionId, long timestamp, TestEventPayload payload);
    
    /**
     * Create batch of session events for testing
     * @param sessionId session identifier
     * @param eventCount number of events to create
     * @param timeRange time range for event distribution
     * @return List of SessionEvent objects
     */
    public List<SessionEvent> createEventBatch(
        String sessionId, 
        int eventCount, 
        TimeRange timeRange);
}

Event Generator Interface

Core interface for session event generation with flexible key and event type support.

/**
 * Interface for generating session window events with configurable key and event types
 * @param <K> key type for session events
 * @param <E> event type for session data
 */
public interface EventGenerator<K, E> {
    
    /**
     * Generate event at specified global watermark
     * @param globalWatermark current global watermark
     * @return generated event, or null if no event should be generated
     */
    E generateEvent(long globalWatermark);
    
    /**
     * Check if generator can produce an event at the specified watermark
     * @param globalWatermark watermark to check against
     * @return boolean indicating if event can be generated
     */
    boolean canGenerateEventAtWatermark(long globalWatermark);
    
    /**
     * Check if generator has more events to produce
     * @return boolean indicating if more events are available
     */
    boolean hasMoreEvents();
    
    /**
     * Get local watermark for this generator
     * @return long representing local watermark
     */
    long getLocalWatermark();
    
    /**
     * Get next generator in sequence for chained generation
     * @param globalWatermark current global watermark
     * @return EventGenerator instance for next generation phase
     */
    EventGenerator<K, E> getNextGenerator(long globalWatermark);
    
    /**
     * Get key associated with this generator's events
     * @return K key for session grouping
     */
    K getKey();
    
    /**
     * Reset generator to initial state
     */
    void reset();
    
    /**
     * Get configuration used by this generator
     * @return SessionConfiguration instance
     */
    SessionConfiguration getConfiguration();
}

Session Event Generators

Implementation classes for generating session events with different patterns and configurations.

/**
 * Session event generator implementation with configurable event patterns
 */
public class SessionEventGeneratorImpl implements SessionEventGenerator {
    
    /**
     * Constructor for session event generator
     * @param config SessionConfiguration for generator behavior
     * @param eventFactory factory for creating events
     */
    public SessionEventGeneratorImpl(
        SessionConfiguration config, 
        GeneratorEventFactory eventFactory);
    
    @Override
    public Stream<SessionEvent> generateEvents();
    
    @Override
    public Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);
    
    /**
     * Generate events for specific session with controlled timing
     * @param sessionId session identifier
     * @param eventCount number of events to generate
     * @param sessionDuration total duration of session
     * @return Stream of SessionEvent objects
     */
    public Stream<SessionEvent> generateSessionEvents(
        String sessionId, 
        int eventCount, 
        Duration sessionDuration);
    
    /**
     * Generate overlapping session events for testing session boundaries
     * @param sessionIds list of session identifiers
     * @param overlapDuration duration of session overlap
     * @return Stream of overlapping SessionEvent objects
     */
    public Stream<SessionEvent> generateOverlappingSessions(
        List<String> sessionIds, 
        Duration overlapDuration);
}

/**
 * Parallel sessions event generator for testing concurrent session processing
 */
public class ParallelSessionsEventGenerator implements SessionEventGenerator {
    
    /**
     * Constructor for parallel sessions generator
     * @param config SessionConfiguration for each parallel session
     * @param sessionCount number of concurrent sessions
     */
    public ParallelSessionsEventGenerator(SessionConfiguration config, int sessionCount);
    
    @Override
    public Stream<SessionEvent> generateEvents();
    
    /**
     * Generate events for multiple parallel sessions with different patterns
     * @param sessionConfigs configurations for each parallel session
     * @return Stream of SessionEvent objects from all parallel sessions
     */
    public Stream<SessionEvent> generateParallelSessions(
        List<SessionConfiguration> sessionConfigs);
    
    /**
     * Generate sessions with configurable arrival patterns
     * @param arrivalPattern pattern for session arrival (UNIFORM, POISSON, BURST)
     * @param sessionDuration duration of each session
     * @return Stream of SessionEvent objects with specified arrival pattern
     */
    public Stream<SessionEvent> generateSessionsWithArrivalPattern(
        ArrivalPattern arrivalPattern, 
        Duration sessionDuration);
}

/**
 * Interface for session event generators
 */
public interface SessionEventGenerator {
    
    /**
     * Generate stream of session events
     * @return Stream of SessionEvent objects
     */
    Stream<SessionEvent> generateEvents();
    
    /**
     * Generate events for specific time range
     * @param range TimeRange for event generation
     * @return Stream of SessionEvent objects within time range
     */
    Stream<SessionEvent> generateEventsForTimeRange(TimeRange range);
}

Session Event Data Structures

Data structures representing session events and their associated metadata.

/**
 * Session event data structure for window testing
 */
public class SessionEvent {
    
    /**
     * Constructor for session event
     * @param sessionId identifier of the session
     * @param timestamp event timestamp
     * @param payload event payload data
     */
    public SessionEvent(String sessionId, long timestamp, TestEventPayload payload);
    
    /**
     * Get session identifier
     * @return String session ID
     */
    public String getSessionId();
    
    /**
     * Get event timestamp
     * @return long timestamp in milliseconds
     */
    public long getTimestamp();
    
    /**
     * Get event payload
     * @return TestEventPayload containing event data
     */
    public TestEventPayload getPayload();
    
    /**
     * Check if event belongs to specified session
     * @param sessionId session identifier to check
     * @return boolean indicating session membership
     */
    public boolean belongsToSession(String sessionId);
    
    /**
     * Calculate time difference from another session event
     * @param other other SessionEvent to compare with
     * @return long time difference in milliseconds
     */
    public long getTimeDifferenceFrom(SessionEvent other);
}

/**
 * Event payload for testing session window functionality
 */
public class TestEventPayload {
    
    /**
     * Constructor for test event payload
     * @param data payload data as string
     * @param eventType type of the event
     * @param sequenceNumber sequence number within session
     */
    public TestEventPayload(String data, String eventType, int sequenceNumber);
    
    /**
     * Get payload data
     * @return String payload data
     */
    public String getData();
    
    /**
     * Get event type
     * @return String event type identifier
     */
    public String getEventType();
    
    /**
     * Get sequence number within session
     * @return int sequence number
     */
    public int getSequenceNumber();
    
    /**
     * Get payload size in bytes
     * @return int size of payload data
     */
    public int getPayloadSize();
}

Configuration Classes

Configuration classes for customizing session event generation and testing behavior.

/**
 * Configuration for session event generation and testing
 */
public class SessionConfiguration {
    
    /**
     * Constructor for session configuration
     * @param sessionTimeout timeout for session inactivity
     * @param eventRate events per second generation rate
     * @param sessionDuration maximum duration of sessions
     */
    public SessionConfiguration(
        Duration sessionTimeout, 
        double eventRate, 
        Duration sessionDuration);
    
    /**
     * Get session timeout duration
     * @return Duration of session timeout
     */
    public Duration getSessionTimeout();
    
    /**
     * Get event generation rate
     * @return double events per second
     */
    public double getEventRate();
    
    /**
     * Get maximum session duration
     * @return Duration of maximum session length
     */
    public Duration getSessionDuration();
    
    /**
     * Get session gap threshold for session boundary detection
     * @return Duration threshold for session gaps
     */
    public Duration getSessionGapThreshold();
    
    /**
     * Check if sessions should overlap for testing
     * @return boolean indicating session overlap configuration
     */
    public boolean isSessionOverlapEnabled();
    
    /**
     * Get parallelism level for session processing
     * @return int parallelism level
     */
    public int getParallelism();
}

/**
 * Configuration for event generator behavior and patterns
 */
public class GeneratorConfiguration {
    
    /**
     * Constructor for generator configuration
     * @param eventTypes types of events to generate
     * @param distributionPattern distribution pattern for event timing
     * @param seedValue random seed for reproducible generation
     */
    public GeneratorConfiguration(
        List<String> eventTypes, 
        DistributionPattern distributionPattern, 
        long seedValue);
    
    /**
     * Get configured event types
     * @return List of String event type identifiers
     */
    public List<String> getEventTypes();
    
    /**
     * Get event distribution pattern
     * @return DistributionPattern for event timing
     */
    public DistributionPattern getDistributionPattern();
    
    /**
     * Get random seed for reproducible generation
     * @return long seed value
     */
    public long getSeedValue();
    
    /**
     * Get payload size configuration
     * @return PayloadSizeConfig for event payload sizing
     */
    public PayloadSizeConfig getPayloadSizeConfig();
}

/**
 * Time range specification for event generation
 */
public class TimeRange {
    
    /**
     * Constructor for time range
     * @param startTime start timestamp
     * @param endTime end timestamp
     */
    public TimeRange(long startTime, long endTime);
    
    /**
     * Get start timestamp
     * @return long start time in milliseconds
     */
    public long getStartTime();
    
    /**
     * Get end timestamp
     * @return long end time in milliseconds
     */
    public long getEndTime();
    
    /**
     * Get duration of time range
     * @return Duration of the time range
     */
    public Duration getDuration();
    
    /**
     * Check if timestamp falls within range
     * @param timestamp timestamp to check
     * @return boolean indicating if timestamp is in range
     */
    public boolean contains(long timestamp);
}

/**
 * Event arrival patterns for session generation
 */
public enum ArrivalPattern {
    /** Uniform arrival rate */
    UNIFORM,
    /** Poisson distributed arrivals */
    POISSON,
    /** Bursty arrival pattern */
    BURST,
    /** Custom configured pattern */
    CUSTOM
}

/**
 * Distribution patterns for event timing
 */
public enum DistributionPattern {
    /** Regular intervals */
    REGULAR,
    /** Random intervals */
    RANDOM,
    /** Exponential distribution */
    EXPONENTIAL,
    /** Normal distribution */
    NORMAL
}

Session Window Validation

Utilities for validating session window behavior and results.

/**
 * Validator for session window processing results
 */
public class SessionWindowValidator {
    
    /**
     * Constructor for session window validator
     * @param expectedSessions expected session configurations
     */
    public SessionWindowValidator(List<SessionConfiguration> expectedSessions);
    
    /**
     * Validate session window results against expected sessions
     * @param windowResults results from session window processing
     * @return boolean indicating validation success
     */
    public boolean validateSessionWindows(List<WindowResult> windowResults);
    
    /**
     * Validate session boundaries and timing
     * @param sessionEvents events grouped by session
     * @param sessionTimeout configured session timeout
     * @return boolean indicating correct session boundaries
     */
    public boolean validateSessionBoundaries(
        Map<String, List<SessionEvent>> sessionEvents, 
        Duration sessionTimeout);
    
    /**
     * Validate session completeness and event ordering
     * @param processedSessions processed session results
     * @return boolean indicating session completeness
     */
    public boolean validateSessionCompleteness(List<ProcessedSession> processedSessions);
}

/**
 * Result of session window processing
 */
public class WindowResult {
    
    /**
     * Constructor for window result
     * @param sessionId session identifier
     * @param startTime window start time
     * @param endTime window end time
     * @param eventCount number of events in window
     */
    public WindowResult(String sessionId, long startTime, long endTime, int eventCount);
    
    /**
     * Get session identifier for this window
     * @return String session ID
     */
    public String getSessionId();
    
    /**
     * Get window start time
     * @return long start timestamp
     */
    public long getStartTime();
    
    /**
     * Get window end time
     * @return long end timestamp
     */
    public long getEndTime();
    
    /**
     * Get number of events processed in window
     * @return int event count
     */
    public int getEventCount();
    
    /**
     * Get window duration
     * @return Duration of the window
     */
    public Duration getWindowDuration();
}

Usage Examples:

import org.apache.flink.test.windowing.sessionwindows.*;

// Basic session window testing
public class SessionWindowTest {
    
    @Test
    public void testBasicSessionWindows() throws Exception {
        // Configure session parameters
        SessionConfiguration config = new SessionConfiguration(
            Duration.ofMinutes(5),  // 5 minute session timeout
            10.0,                   // 10 events per second
            Duration.ofMinutes(30)  // 30 minute max session duration
        );
        
        // Create event generator
        SessionEventGenerator generator = EventGeneratorFactory.create(config);
        
        // Generate test events
        Stream<SessionEvent> events = generator.generateEvents();
        
        // Create Flink job for session window processing
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<SessionEvent> eventStream = env.fromCollection(
            events.limit(1000).collect(Collectors.toList()));
        
        // Apply session windows
        DataStream<WindowResult> windowResults = eventStream
            .keyBy(SessionEvent::getSessionId)
            .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
            .apply(new SessionWindowFunction());
        
        // Validate results
        List<WindowResult> results = DataStreamUtils.collect(windowResults);
        
        SessionWindowValidator validator = new SessionWindowValidator(
            Arrays.asList(config));
        assertTrue(validator.validateSessionWindows(results));
    }
    
    @Test
    public void testOverlappingSessions() throws Exception {
        SessionConfiguration config = new SessionConfiguration(
            Duration.ofMinutes(2),  // Short timeout for overlap testing
            5.0,                    // Moderate event rate
            Duration.ofMinutes(10)  // Session duration
        );
        
        SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(
            config, new GeneratorEventFactory(createEventTypeConfig()));
        
        // Generate overlapping sessions
        List<String> sessionIds = Arrays.asList("session1", "session2", "session3");
        Stream<SessionEvent> overlappingEvents = generator.generateOverlappingSessions(
            sessionIds, Duration.ofMinutes(1));
        
        // Process with session windows
        List<SessionEvent> eventList = overlappingEvents
            .limit(500)
            .collect(Collectors.toList());
        
        // Validate session boundaries
        Map<String, List<SessionEvent>> eventsBySession = eventList.stream()
            .collect(Collectors.groupingBy(SessionEvent::getSessionId));
        
        SessionWindowValidator validator = new SessionWindowValidator(
            Arrays.asList(config));
        assertTrue(validator.validateSessionBoundaries(
            eventsBySession, config.getSessionTimeout()));
    }
    
    @Test
    public void testParallelSessionGeneration() throws Exception {
        SessionConfiguration config = new SessionConfiguration(
            Duration.ofMinutes(3),
            8.0, 
            Duration.ofMinutes(15)
        );
        
        // Create parallel session generator
        ParallelSessionsEventGenerator parallelGenerator = 
            EventGeneratorFactory.createParallel(config, 5);
        
        // Generate events with different arrival patterns
        Stream<SessionEvent> uniformEvents = parallelGenerator
            .generateSessionsWithArrivalPattern(
                ArrivalPattern.UNIFORM, Duration.ofMinutes(10));
        
        Stream<SessionEvent> poissonEvents = parallelGenerator
            .generateSessionsWithArrivalPattern(
                ArrivalPattern.POISSON, Duration.ofMinutes(10));
        
        // Combine and process both streams
        List<SessionEvent> allEvents = Stream.concat(uniformEvents, poissonEvents)
            .limit(2000)
            .collect(Collectors.toList());
        
        // Validate event distribution
        assertFalse(allEvents.isEmpty());
        assertTrue(allEvents.size() >= 1000); // Should have events from both patterns
    }
}

// Advanced session window testing
public class AdvancedSessionWindowTest {
    
    @Test
    public void testCustomEventFactory() throws Exception {
        // Create custom event type configuration
        EventTypeConfiguration eventTypeConfig = new EventTypeConfiguration(
            Arrays.asList("login", "purchase", "logout"),
            Map.of("login", 0.4, "purchase", 0.4, "logout", 0.2)
        );
        
        GeneratorEventFactory customFactory = new GeneratorEventFactory(eventTypeConfig);
        
        SessionConfiguration config = new SessionConfiguration(
            Duration.ofMinutes(10),
            15.0,
            Duration.ofMinutes(45)
        );
        
        // Create generator with custom factory
        SessionEventGenerator generator = EventGeneratorFactory
            .createWithCustomFactory(customFactory, config);
        
        // Generate events for specific time range
        TimeRange testRange = new TimeRange(
            System.currentTimeMillis(),
            System.currentTimeMillis() + Duration.ofHours(1).toMillis()
        );
        
        Stream<SessionEvent> timeRangeEvents = generator
            .generateEventsForTimeRange(testRange);
        
        List<SessionEvent> events = timeRangeEvents
            .limit(1000)
            .collect(Collectors.toList());
        
        // Validate events are within time range
        assertTrue(events.stream().allMatch(event -> 
            testRange.contains(event.getTimestamp())));
        
        // Validate event type distribution
        Map<String, Long> eventTypeCounts = events.stream()
            .collect(Collectors.groupingBy(
                event -> event.getPayload().getEventType(),
                Collectors.counting()));
        
        assertTrue(eventTypeCounts.containsKey("login"));
        assertTrue(eventTypeCounts.containsKey("purchase"));
        assertTrue(eventTypeCounts.containsKey("logout"));
    }
    
    @Test
    public void testSessionEventSequencing() throws Exception {
        SessionConfiguration config = new SessionConfiguration(
            Duration.ofMinutes(5),
            12.0,
            Duration.ofMinutes(20)
        );
        
        SessionEventGeneratorImpl generator = new SessionEventGeneratorImpl(
            config, new GeneratorEventFactory(createEventTypeConfig()));
        
        // Generate events for specific session
        Stream<SessionEvent> sessionEvents = generator.generateSessionEvents(
            "test-session-001", 100, Duration.ofMinutes(15));
        
        List<SessionEvent> eventList = sessionEvents.collect(Collectors.toList());
        
        // Validate event sequencing
        for (int i = 0; i < eventList.size() - 1; i++) {
            SessionEvent current = eventList.get(i);
            SessionEvent next = eventList.get(i + 1);
            
            // Events should be ordered by timestamp
            assertTrue(current.getTimestamp() <= next.getTimestamp());
            
            // All events should belong to same session
            assertEquals("test-session-001", current.getSessionId());
            assertEquals("test-session-001", next.getSessionId());
            
            // Sequence numbers should be increasing
            assertTrue(current.getPayload().getSequenceNumber() <= 
                      next.getPayload().getSequenceNumber());
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tests

docs

cancellation-testing.md

checkpointing-migration.md

fault-tolerance-recovery.md

index.md

operator-lifecycle.md

plugin-testing.md

runtime-utilities.md

session-window-testing.md

state-backend-restore.md

test-data-utilities.md

tile.json