or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md
tile.json

state-management-testing.mddocs/

State Management Testing

Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios. This framework validates that streaming operators can correctly restore their state after failures, checkpoints, and version migrations.

Capabilities

Execution Mode Control

Enumeration controlling function behavior for different test stages.

/**
 * Execution mode enumeration for controlling test behavior
 */
public enum ExecutionMode {
    /** Generate initial state and data */
    GENERATE,
    
    /** Migrate state to new format */  
    MIGRATE,
    
    /** Restore from migrated state */
    RESTORE
}

Usage Example:

// Control test behavior based on execution mode
ExecutionMode mode = ExecutionMode.valueOf(args[0]);

switch (mode) {
    case GENERATE:
        // Generate initial state and create savepoint
        runStateGenerationJob();
        break;
    case MIGRATE:
        // Migrate state format if needed
        runStateMigrationJob();
        break;
    case RESTORE:
        // Restore from savepoint and verify
        runStateRestorationJob();
        break;
}

Operator Restore Test Base Classes

Abstract base classes for testing operator state restoration scenarios.

/**
 * Abstract base class for testing operator state restoration
 */
public abstract class AbstractOperatorRestoreTestBase {
    
    /**
     * Create the streaming topology for state testing
     * @param env StreamExecutionEnvironment to configure
     * @param mode ExecutionMode determining test behavior
     */
    protected abstract void createRestorationTopology(
        StreamExecutionEnvironment env, 
        ExecutionMode mode);
    
    /**
     * Verify state restoration was successful
     * @param mode ExecutionMode that was executed
     * @throws Exception if verification fails
     */
    protected abstract void verifyRestorationResult(ExecutionMode mode) throws Exception;
}

Keyed Operator Restore Testing

Framework for testing keyed operator state restoration.

/**
 * Abstract base class for testing keyed operator state restoration
 */
public abstract class AbstractKeyedOperatorRestoreTestBase 
        extends AbstractOperatorRestoreTestBase {
    
    /**
     * Create stateful keyed operators for testing state restoration
     * @param env StreamExecutionEnvironment to configure
     * @param mode ExecutionMode determining behavior
     */
    protected abstract void createKeyedStateTopology(
        StreamExecutionEnvironment env,
        ExecutionMode mode);
    
    /**
     * Verify keyed state was correctly restored
     * @param expectedStateValues Expected state values after restoration
     * @throws Exception if verification fails
     */
    protected void verifyKeyedState(Map<String, Object> expectedStateValues) throws Exception;
}

/**
 * Standalone job for testing keyed state migration
 */
public class KeyedJob {
    
    /**
     * Main entry point for keyed state migration testing
     * @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
     * @throws Exception if job execution fails
     */
    public static void main(String[] args) throws Exception;
}

Non-Keyed Operator Restore Testing

Framework for testing non-keyed operator state restoration.

/**
 * Abstract base class for testing non-keyed operator state restoration  
 */
public abstract class AbstractNonKeyedOperatorRestoreTestBase
        extends AbstractOperatorRestoreTestBase {
    
    /**
     * Create stateful non-keyed operators for testing state restoration
     * @param env StreamExecutionEnvironment to configure
     * @param mode ExecutionMode determining behavior
     */
    protected abstract void createNonKeyedStateTopology(
        StreamExecutionEnvironment env,
        ExecutionMode mode);
    
    /**
     * Verify non-keyed state was correctly restored
     * @param expectedGlobalState Expected global state after restoration
     * @throws Exception if verification fails
     */
    protected void verifyNonKeyedState(Object expectedGlobalState) throws Exception;
}

/**
 * Standalone job for testing non-keyed state migration
 */
public class NonKeyedJob {
    
    /**
     * Main entry point for non-keyed state migration testing
     * @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
     * @throws Exception if job execution fails
     */
    public static void main(String[] args) throws Exception;
}

State Management Test Patterns

Common patterns for implementing state management tests:

Basic Keyed State Test:

public class KeyedStateRestorationTest extends AbstractKeyedOperatorRestoreTestBase {
    
    private TestListResultSink<Tuple2<String, Integer>> resultSink;
    
    @Override
    protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
        env.setParallelism(2);
        env.enableCheckpointing(100);
        
        resultSink = new TestListResultSink<>();
        
        DataStream<String> input;
        
        if (mode == ExecutionMode.GENERATE) {
            // Generate test data and state
            input = env.fromElements("key1", "key1", "key2", "key2", "key1");
        } else {
            // Use minimal input for restoration testing
            input = env.fromElements("key1", "key2");
        }
        
        input.keyBy(value -> value)
             .process(new StatefulKeyedProcessFunction(mode))
             .addSink(resultSink);
    }
    
    @Override
    protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
        List<Tuple2<String, Integer>> results = resultSink.getResult();
        
        if (mode == ExecutionMode.RESTORE) {
            // Verify state was correctly restored
            Map<String, Integer> stateMap = results.stream()
                .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
            
            assertEquals(3, stateMap.get("key1").intValue()); // Previous count + 1
            assertEquals(2, stateMap.get("key2").intValue()); // Previous count + 1
        }
    }
    
    @Test
    public void testKeyedStateRestoration() throws Exception {
        // Phase 1: Generate state and create savepoint
        String savepointPath = runTestPhase(ExecutionMode.GENERATE);
        
        // Phase 2: Restore from savepoint and verify
        runTestPhase(ExecutionMode.RESTORE, savepointPath);
    }
}

Non-Keyed State Test:

public class NonKeyedStateRestorationTest extends AbstractNonKeyedOperatorRestoreTestBase {
    
    private TestListResultSink<Long> resultSink;
    
    @Override
    protected void createNonKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
        env.setParallelism(1); // Non-keyed operators typically use parallelism 1
        env.enableCheckpointing(50);
        
        resultSink = new TestListResultSink<>();
        
        DataStream<Integer> input;
        
        if (mode == ExecutionMode.GENERATE) {
            input = env.fromElements(1, 2, 3, 4, 5);
        } else {
            input = env.fromElements(6, 7); // Additional elements for restore test
        }
        
        input.process(new StatefulNonKeyedProcessFunction(mode))
             .addSink(resultSink);
    }
    
    @Override
    protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
        List<Long> results = resultSink.getResult();
        
        if (mode == ExecutionMode.RESTORE) {
            // Verify global state was restored (running sum should continue)
            long finalSum = results.get(results.size() - 1);
            assertEquals(28, finalSum); // 15 (previous) + 6 + 7 = 28
        }
    }
}

State Migration Test:

public class StateMigrationTest extends AbstractKeyedOperatorRestoreTestBase {
    
    @Override
    protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
        env.setParallelism(4);
        env.enableCheckpointing(100);
        
        DataStream<String> source = env.addSource(new TestDataSource(mode));
        
        if (mode == ExecutionMode.MIGRATE) {
            // Use evolved state schema
            source.keyBy(value -> value)
                  .process(new EvolvedStateProcessFunction())
                  .addSink(new DiscardingSink<>());
        } else {
            // Use original state schema
            source.keyBy(value -> value)
                  .process(new OriginalStateProcessFunction())
                  .addSink(new DiscardingSink<>());
        }
    }
    
    @Test 
    public void testStateSchemaMigration() throws Exception {
        // Generate with original schema
        String originalSavepoint = runTestPhase(ExecutionMode.GENERATE);
        
        // Migrate to new schema
        String migratedSavepoint = runTestPhase(ExecutionMode.MIGRATE, originalSavepoint);
        
        // Restore with new schema
        runTestPhase(ExecutionMode.RESTORE, migratedSavepoint);
    }
}

Standalone Job Pattern:

// Using KeyedJob for state migration testing
public class KeyedStateMigrationITCase {
    
    @Test
    public void testKeyedStateMigration() throws Exception {
        String checkpointDir = tempFolder.newFolder("checkpoints").getAbsolutePath();
        String savepointPath = null;
        
        // Generate phase
        String[] generateArgs = {
            ExecutionMode.GENERATE.toString(),
            "null", // no input savepoint
            checkpointDir
        };
        
        KeyedJob.main(generateArgs);
        
        // Find generated savepoint
        savepointPath = findLatestSavepoint(checkpointDir);
        assertNotNull("Savepoint should be generated", savepointPath);
        
        // Restore phase  
        String[] restoreArgs = {
            ExecutionMode.RESTORE.toString(),
            savepointPath,
            checkpointDir
        };
        
        KeyedJob.main(restoreArgs);
        // Test passes if restoration completes without exception
    }
}

Complex State Evolution Test:

public class ComplexStateEvolutionTest extends AbstractKeyedOperatorRestoreTestBase {
    
    @Override
    protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
        env.setParallelism(2);
        env.enableCheckpointing(100);
        
        DataStream<Tuple2<String, Integer>> source = createTestSource(mode);
        
        source.keyBy(value -> value.f0)
              .process(new MultiStateProcessFunction(mode))
              .addSink(new TestResultSink());
    }
    
    /**
     * Process function with multiple state types for evolution testing
     */
    private static class MultiStateProcessFunction 
            extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
        
        private ValueState<Integer> countState;
        private ListState<String> historyState;
        private MapState<String, Long> timestampState;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            // Initialize state descriptors
            countState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("count", Integer.class));
            historyState = getRuntimeContext().getListState(
                new ListStateDescriptor<>("history", String.class));
            timestampState = getRuntimeContext().getMapState(
                new MapStateDescriptor<>("timestamps", String.class, Long.class));
        }
        
        @Override
        public void processElement(
                Tuple2<String, Integer> value,
                Context ctx,
                Collector<String> out) throws Exception {
            
            // Update multiple state types
            Integer currentCount = countState.value();
            countState.update(currentCount == null ? 1 : currentCount + 1);
            
            historyState.add(value.toString());
            timestampState.put(value.f0, ctx.timestamp());
            
            // Emit result
            out.collect(String.format("Key: %s, Count: %d", 
                       value.f0, countState.value()));
        }
    }
}

This state management testing framework ensures that Flink operators correctly maintain and restore their state across various failure and migration scenarios, providing confidence in the reliability of stateful streaming applications.