CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-state-processor-api-2-12

Apache Flink State Processor API for reading and writing savepoint state data offline

Pending
Overview
Eval results
Files

function-interfaces.mddocs/

Function Interfaces

Function interfaces define the contracts for processing state data in the State Processor API. These interfaces allow you to implement custom logic for reading from and writing to Flink savepoints.

State Bootstrap Functions

StateBootstrapFunction

Base function for bootstrapping non-keyed operator state.

public abstract class StateBootstrapFunction<IN> extends AbstractRichFunction 
        implements CheckpointedFunction {
    public abstract void processElement(IN value, Context ctx) throws Exception;
    
    public interface Context {
        long currentProcessingTime();
    }
}

Lifecycle:

  1. open(Configuration) - Initialize state descriptors and resources
  2. processElement(T, Context) - Process each input element
  3. close() - Cleanup resources

Usage Example:

public class MetricsBootstrapFunction extends StateBootstrapFunction<MetricEvent> {
    private ValueState<Long> totalCountState;
    private ValueState<Double> averageState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Register state descriptors
        totalCountState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("totalCount", Long.class)
        );
        averageState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("average", Double.class)
        );
    }
    
    @Override
    public void processElement(MetricEvent event, Context ctx) throws Exception {
        // Update total count
        Long currentCount = totalCountState.value();
        totalCountState.update((currentCount != null ? currentCount : 0L) + 1);
        
        // Update running average
        Double currentAvg = averageState.value();
        double newAvg = calculateRunningAverage(currentAvg, event.getValue());
        averageState.update(newAvg);
        
        // Access context information
        System.out.println("Processing time: " + ctx.currentProcessingTime());
        System.out.println("Watermark: " + ctx.currentWatermark());
    }
    
    private double calculateRunningAverage(Double currentAvg, double newValue) {
        return currentAvg != null ? (currentAvg + newValue) / 2.0 : newValue;
    }
}

KeyedStateBootstrapFunction

Function for bootstrapping keyed state with access to key context and timers.

public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
    public abstract void processElement(IN value, Context ctx) throws Exception;
    
    public abstract class Context {
        public abstract TimerService timerService();
        public abstract K getCurrentKey();
    }
}

Key Features:

  • Access to current key via ctx.getCurrentKey()
  • Timer service for registering event-time and processing-time timers
  • Keyed state automatically partitioned by key

Usage Example:

public class UserSessionBootstrapFunction extends KeyedStateBootstrapFunction<String, UserActivity> {
    private ValueState<UserSession> sessionState;
    private ListState<String> activityLogState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        sessionState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("session", UserSession.class)
        );
        activityLogState = getRuntimeContext().getListState(
            new ListStateDescriptor<>("activityLog", String.class)
        );
    }
    
    @Override
    public void processElement(UserActivity activity, Context ctx) throws Exception {
        String userId = ctx.getCurrentKey();
        
        // Update or create user session
        UserSession session = sessionState.value();
        if (session == null) {
            session = new UserSession(userId, activity.getTimestamp());
        }
        session.addActivity(activity);
        sessionState.update(session);
        
        // Add to activity log
        activityLogState.add(activity.getAction() + ":" + activity.getTimestamp());
        
        // Set session timeout timer
        ctx.timerService().registerEventTimeTimer(
            activity.getTimestamp() + Duration.ofMinutes(30).toMillis()
        );
        
        System.out.println("Updated session for user: " + userId);
    }
}

BroadcastStateBootstrapFunction

Function for bootstrapping broadcast state that is replicated across all operator instances.

public abstract class BroadcastStateBootstrapFunction<IN> extends AbstractRichFunction {
    public abstract void processElement(IN value, Context ctx) throws Exception;
    
    public interface Context {
        long currentProcessingTime();
        <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor);
    }
}

Usage Example:

public class RulesBootstrapFunction extends BroadcastStateBootstrapFunction<BusinessRule> {
    private MapStateDescriptor<String, BusinessRule> rulesDescriptor;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        rulesDescriptor = new MapStateDescriptor<>(
            "business-rules", String.class, BusinessRule.class
        );
    }
    
    @Override
    public void processElement(BusinessRule rule, Context ctx) throws Exception {
        BroadcastState<String, BusinessRule> broadcastState = 
            getRuntimeContext().getBroadcastState(rulesDescriptor);
        
        broadcastState.put(rule.getId(), rule);
        
        System.out.println("Broadcasted rule: " + rule.getId());
    }
}

State Reader Functions

KeyedStateReaderFunction

Function for reading keyed state from existing savepoints.

public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
    public abstract void open(Configuration parameters) throws Exception;
    
    public abstract void readKey(K key, Context ctx, Collector<OUT> out) throws Exception;
    
    public interface Context {
        Set<Long> registeredEventTimeTimers() throws Exception;
        Set<Long> registeredProcessingTimeTimers() throws Exception;
    }
}

Key Features:

  • Must register all state descriptors in open() method
  • Called once per key in the savepoint
  • Access to registered timers for each key
  • Can output zero or more results per key

Usage Example:

public class UserAnalyticsReaderFunction extends KeyedStateReaderFunction<String, UserInsight> {
    private ValueState<UserProfile> profileState;
    private ListState<PurchaseEvent> purchaseHistoryState;
    private MapState<String, Double> categorySpendingState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Must register ALL state descriptors here
        profileState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("profile", UserProfile.class)
        );
        purchaseHistoryState = getRuntimeContext().getListState(
            new ListStateDescriptor<>("purchases", PurchaseEvent.class)
        );
        categorySpendingState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("categorySpending", String.class, Double.class)
        );
    }
    
    @Override
    public void readKey(String userId, Context ctx, Collector<UserInsight> out) throws Exception {
        UserProfile profile = profileState.value();
        if (profile == null) {
            return; // No data for this user
        }
        
        // Collect purchase history
        List<PurchaseEvent> purchases = new ArrayList<>();
        purchaseHistoryState.get().forEach(purchases::add);
        
        // Collect category spending
        Map<String, Double> categorySpending = new HashMap<>();
        for (Map.Entry<String, Double> entry : categorySpendingState.entries()) {
            categorySpending.put(entry.getKey(), entry.getValue());
        }
        
        // Access timer information
        Set<Long> eventTimers = ctx.registeredEventTimeTimers();
        Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();
        
        // Create and emit insight
        UserInsight insight = new UserInsight(
            userId, 
            profile, 
            purchases, 
            categorySpending,
            eventTimers,
            processingTimers
        );
        
        out.collect(insight);
    }
}

WindowReaderFunction

Function for reading window state with access to window metadata.

public abstract class WindowReaderFunction<IN, OUT, KEY, W extends Window> 
        extends AbstractRichFunction {
    public abstract void readWindow(
        KEY key, 
        Context<W> context, 
        Iterable<IN> elements, 
        Collector<OUT> out
    ) throws Exception;
    
    public interface Context<W extends Window> extends java.io.Serializable {
        W window();
        <S extends State> S triggerState(StateDescriptor<S, ?> descriptor);
        KeyedStateStore windowState();
        KeyedStateStore globalState();
        Set<Long> registeredEventTimeTimers() throws Exception;
        Set<Long> registeredProcessingTimeTimers() throws Exception;
    }
}

Usage Example:

public class SessionWindowReaderFunction implements WindowReaderFunction<UserEvent, SessionSummary, String, TimeWindow> {
    
    @Override
    public void readWindow(
        String userId,
        Context context,
        Iterable<UserEvent> events,
        Collector<SessionSummary> out
    ) throws Exception {
        
        TimeWindow window = context.window();
        List<UserEvent> eventList = new ArrayList<>();
        events.forEach(eventList::add);
        
        if (!eventList.isEmpty()) {
            SessionSummary summary = new SessionSummary(
                userId,
                window.getStart(),
                window.getEnd(),
                eventList.size(),
                eventList
            );
            
            out.collect(summary);
        }
    }
}

Utility Functions

Timestamper

Interface for assigning timestamps to elements during bootstrap.

@FunctionalInterface
public interface Timestamper<T> extends Function {
    long timestamp(T element);
}

Usage Example:

public class EventTimestamper implements Timestamper<Event> {
    @Override
    public long timestamp(Event event) {
        return event.getEventTime();
    }
}

// Use with bootstrap transformation
BootstrapTransformation<Event> transformation = OperatorTransformation
    .bootstrapWith(events)
    .keyBy(Event::getUserId)
    .assignTimestamps(new EventTimestamper())  // Custom timestamper
    .transform(new EventBootstrapFunction());

Advanced Function Patterns

Multi-State Function

public class ComplexStateBootstrapFunction extends KeyedStateBootstrapFunction<String, ComplexEvent> {
    // Multiple state types
    private ValueState<String> statusState;
    private ListState<String> historyState;
    private MapState<String, Integer> countersState;
    private ReducingState<Double> sumState;
    private AggregatingState<Double, Double, Double> avgState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Value state
        statusState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("status", String.class)
        );
        
        // List state
        historyState = getRuntimeContext().getListState(
            new ListStateDescriptor<>("history", String.class)
        );
        
        // Map state
        countersState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("counters", String.class, Integer.class)
        );
        
        // Reducing state
        sumState = getRuntimeContext().getReducingState(
            new ReducingStateDescriptor<>("sum", Double::sum, Double.class)
        );
        
        // Aggregating state
        avgState = getRuntimeContext().getAggregatingState(
            new AggregatingStateDescriptor<>("avg", new AverageAggregator(), Double.class)
        );
    }
    
    @Override
    public void processElement(ComplexEvent event, Context ctx) throws Exception {
        // Update all state types
        statusState.update(event.getStatus());
        historyState.add(event.getAction());
        
        String category = event.getCategory();
        Integer count = countersState.get(category);
        countersState.put(category, (count != null ? count : 0) + 1);
        
        sumState.add(event.getValue());
        avgState.add(event.getValue());
        
        // Set timer if needed
        if (event.needsCleanup()) {
            ctx.timerService().registerEventTimeTimer(
                event.getTimestamp() + Duration.ofHours(1).toMillis()
            );
        }
    }
}

Conditional Processing Function

public class ConditionalReaderFunction extends KeyedStateReaderFunction<String, FilteredResult> {
    private ValueState<UserData> userDataState;
    private ListState<Transaction> transactionState;
    private final Predicate<UserData> userFilter;
    private final Predicate<Transaction> transactionFilter;
    
    public ConditionalReaderFunction(
        Predicate<UserData> userFilter,
        Predicate<Transaction> transactionFilter
    ) {
        this.userFilter = userFilter;
        this.transactionFilter = transactionFilter;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        userDataState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("userData", UserData.class)
        );
        transactionState = getRuntimeContext().getListState(
            new ListStateDescriptor<>("transactions", Transaction.class)
        );
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<FilteredResult> out) throws Exception {
        UserData userData = userDataState.value();
        
        // Apply user filter
        if (userData == null || !userFilter.test(userData)) {
            return;
        }
        
        // Filter transactions
        List<Transaction> filteredTransactions = new ArrayList<>();
        for (Transaction transaction : transactionState.get()) {
            if (transactionFilter.test(transaction)) {
                filteredTransactions.add(transaction);
            }
        }
        
        if (!filteredTransactions.isEmpty()) {
            FilteredResult result = new FilteredResult(key, userData, filteredTransactions);
            out.collect(result);
        }
    }
}

Error Handling in Functions

Robust Bootstrap Function

public class RobustBootstrapFunction extends KeyedStateBootstrapFunction<String, DataEvent> {
    private ValueState<String> dataState;
    private static final Logger LOG = LoggerFactory.getLogger(RobustBootstrapFunction.class);
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        try {
            dataState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("data", String.class)
            );
        } catch (Exception e) {
            LOG.error("Failed to initialize state", e);
            throw new RuntimeException("State initialization failed", e);
        }
    }
    
    @Override
    public void processElement(DataEvent event, Context ctx) throws Exception {
        try {
            // Validate input
            if (event == null || event.getData() == null) {
                LOG.warn("Received null event or data for key: {}", ctx.getCurrentKey());
                return;
            }
            
            // Process event
            String currentData = dataState.value();
            String newData = combineData(currentData, event.getData());
            dataState.update(newData);
            
        } catch (Exception e) {
            LOG.error("Failed to process event for key: {}", ctx.getCurrentKey(), e);
            // Decide whether to re-throw or continue
            // throw new RuntimeException("Processing failed", e);
        }
    }
    
    private String combineData(String current, String newData) {
        return current != null ? current + "," + newData : newData;
    }
}

Safe Reader Function

public class SafeReaderFunction extends KeyedStateReaderFunction<String, SafeResult> {
    private ValueState<String> dataState;
    private static final Logger LOG = LoggerFactory.getLogger(SafeReaderFunction.class);
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        dataState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("data", String.class)
        );
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<SafeResult> out) throws Exception {
        try {
            String data = dataState.value();
            
            if (data != null) {
                SafeResult result = new SafeResult(key, data, true);
                out.collect(result);
            } else {
                LOG.debug("No data found for key: {}", key);
                // Optionally emit a result indicating missing data
                SafeResult result = new SafeResult(key, null, false);
                out.collect(result);
            }
            
        } catch (Exception e) {
            LOG.error("Failed to read state for key: {}", key, e);
            // Emit error result instead of failing
            SafeResult errorResult = new SafeResult(key, "ERROR: " + e.getMessage(), false);
            out.collect(errorResult);
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12

docs

function-interfaces.md

index.md

savepoint-management.md

state-reading.md

state-writing.md

window-operations.md

tile.json