CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ray--streaming-state

State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications

Pending
Overview
Eval results
Files

transaction-management.mddocs/

Transaction Management

Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.

Capabilities

State Store Manager Interface

Core transaction interface defining the four-phase commit protocol for state management with checkpoint-based recovery support.

/**
 * Transaction state interface supporting four-phase commit protocol
 */
public interface StateStoreManager {
    /**
     * Finish phase - Complete batch data saving and serialization
     * This is typically where serialization work is performed
     * @param checkpointId Checkpoint identifier for this transaction
     */
    void finish(long checkpointId);
    
    /**
     * Commit phase - Persist data to storage (can be async)
     * This is typically where data persistence is performed
     * @param checkpointId Checkpoint identifier for this transaction
     */
    void commit(long checkpointId);
    
    /**
     * Acknowledge commit phase - Clean up after successful commit
     * Must be called after commit in the same thread
     * @param checkpointId Checkpoint identifier for this transaction
     * @param timeStamp Timestamp of the acknowledgment
     */
    void ackCommit(long checkpointId, long timeStamp);
    
    /**
     * Rollback phase - Recover from checkpoint on failure
     * @param checkpointId Checkpoint identifier to rollback to
     */
    void rollBack(long checkpointId);
}

Usage Examples:

// Example transaction flow for successful checkpoint
StateStoreManager stateManager = keyStateBackend; // KeyStateBackend implements StateStoreManager

long checkpointId = 1001L;

try {
    // Phase 1: Finish - serialize and prepare data
    stateManager.finish(checkpointId);
    
    // Phase 2: Commit - persist data (can be in separate thread)
    stateManager.commit(checkpointId);
    
    // Phase 3: Acknowledge - cleanup after successful commit
    long timestamp = System.currentTimeMillis();
    stateManager.ackCommit(checkpointId, timestamp);
    
    System.out.println("Checkpoint " + checkpointId + " completed successfully");
    
} catch (Exception e) {
    // Phase 4: Rollback - recover on failure
    stateManager.rollBack(checkpointId);
    System.err.println("Checkpoint " + checkpointId + " failed, rolled back");
}

Abstract Key State Backend Transaction Support

Base implementation providing transaction operations and context management for key-based state backends.

/**
 * Base class providing transaction support and state management
 */
public abstract class AbstractKeyStateBackend implements StateStoreManager {
    /**
     * Finish checkpoint - complete batch data saving and serialization
     * @param checkpointId Checkpoint identifier
     */
    public void finish(long checkpointId);
    
    /**
     * Commit checkpoint - persist data (can be async)
     * @param checkpointId Checkpoint identifier
     */
    public void commit(long checkpointId);
    
    /**
     * Acknowledge commit - clean up after commit
     * @param checkpointId Checkpoint identifier
     * @param timeStamp Timestamp of acknowledgment
     */
    public void ackCommit(long checkpointId, long timeStamp);
    
    /**
     * Rollback checkpoint - recover from checkpoint
     * @param checkpointId Checkpoint identifier
     */
    public void rollBack(long checkpointId);
    
    /**
     * Get current checkpoint ID
     * @return Current checkpoint ID
     */
    public long getCheckpointId();
    
    /**
     * Set checkpoint ID for transaction context
     * @param checkpointId Checkpoint ID to set
     */
    public void setCheckpointId(long checkpointId);
    
    /**
     * Set complete processing context
     * @param checkpointId Checkpoint identifier
     * @param currentKey Current processing key
     */
    public void setContext(long checkpointId, Object currentKey);
}

State Store Manager Proxy

Abstract proxy class supporting transaction state operations with strategy delegation for different storage backends.

/**
 * Proxy supporting transaction state operations with strategy delegation
 */
public abstract class StateStoreManagerProxy<V> implements StateStoreManager {
    /**
     * Create state store manager proxy
     * @param keyStateBackend Backend providing transaction support
     * @param stateDescriptor Descriptor defining the state
     */
    public StateStoreManagerProxy(AbstractKeyStateBackend keyStateBackend, AbstractStateDescriptor stateDescriptor);
    
    /**
     * Finish checkpoint phase
     * @param checkpointId Checkpoint identifier
     */
    public void finish(long checkpointId);
    
    /**
     * Commit checkpoint phase (can be async)
     * @param checkpointId Checkpoint identifier
     */
    public void commit(long checkpointId);
    
    /**
     * Acknowledge commit phase
     * @param checkpointId Checkpoint identifier
     * @param timeStamp Timestamp of acknowledgment
     */
    public void ackCommit(long checkpointId, long timeStamp);
    
    /**
     * Rollback checkpoint phase
     * @param checkpointId Checkpoint identifier
     */
    public void rollBack(long checkpointId);
    
    /**
     * Close proxy and cleanup resources
     */
    public void close();
    
    /**
     * Get value by key
     * @param key State key
     * @return Retrieved value
     */
    public V get(String key);
    
    /**
     * Put value by key
     * @param key State key
     * @param value Value to store
     */
    public void put(String key, V value);
}

Abstract State Store Manager

Abstract base class for state store managers implementing three-layer storage architecture (front, middle, remote) with serialization support.

/**
 * Abstract base for state store managers with three-layer storage
 */
public abstract class AbstractStateStoreManager<V> {
    /**
     * Create state store manager with backend storage
     * @param backStore Backend key-value store for persistence
     */
    public AbstractStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> backStore);
    
    /**
     * Convert storage record to bytes for serialization
     * @param storageRecord Record to serialize
     * @return Serialized bytes
     */
    public byte[] toBytes(StorageRecord storageRecord);
    
    /**
     * Convert bytes back to storage record
     * @param data Serialized bytes
     * @return Deserialized storage record
     */
    public StorageRecord<V> toStorageRecord(byte[] data);
    
    /**
     * Get value for checkpoint and key (abstract)
     * @param checkpointId Checkpoint identifier
     * @param key State key
     * @return Retrieved value
     */
    public abstract V get(long checkpointId, String key);
    
    /**
     * Put value for checkpoint and key
     * @param checkpointId Checkpoint identifier
     * @param k State key
     * @param v Value to store
     */
    public void put(long checkpointId, String k, V v);
    
    /**
     * Acknowledge commit with timestamp
     * @param checkpointId Checkpoint identifier
     * @param timeStamp Acknowledgment timestamp
     */
    public void ackCommit(long checkpointId, long timeStamp);
    
    /**
     * Acknowledge commit (abstract)
     * @param checkpointId Checkpoint identifier
     */
    public abstract void ackCommit(long checkpointId);
    
    /**
     * Set key group index for partitioning
     * @param keyGroupIndex Key group index
     */
    public void setKeyGroupIndex(int keyGroupIndex);
    
    /**
     * Close and cleanup resources
     */
    public void close();
}

Storage Strategy Implementations

The library provides two main storage strategies for different consistency and performance requirements.

Dual Version State Store Manager

/**
 * Dual-version state store manager supporting rollback
 */
public class DualStateStoreManager<V> extends AbstractStateStoreManager<V> {
    // Maintains two versions of data for rollback support
    // Implements DUAL_VERSION strategy
}

Multi-Version State Store Manager

/**
 * Multi-version state store manager for MVCC scenarios
 */
public class MVStateStoreManager<V> extends AbstractStateStoreManager<V> {
    // Optimized for MVCC storage systems
    // Implements SINGLE_VERSION strategy
}

Transaction Flow Examples:

// Example 1: Basic transaction with error handling
KeyStateBackend backend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);

// Set up state
ValueStateDescriptor<String> desc = ValueStateDescriptor.build("test-state", String.class, "");
ValueState<String> state = backend.getValueState(desc);

backend.setCurrentKey("key1");
state.update("value1");

// Perform transaction
long checkpointId = System.currentTimeMillis();
try {
    backend.setCheckpointId(checkpointId);
    backend.finish(checkpointId);
    backend.commit(checkpointId);
    backend.ackCommit(checkpointId, System.currentTimeMillis());
    System.out.println("Transaction completed successfully");
} catch (Exception e) {
    backend.rollBack(checkpointId);
    System.err.println("Transaction failed, rolled back: " + e.getMessage());
}

// Example 2: Async commit pattern
ExecutorService executor = Executors.newSingleThreadExecutor();

backend.finish(checkpointId);

// Commit can be performed asynchronously
CompletableFuture<Void> commitFuture = CompletableFuture.runAsync(() -> {
    try {
        backend.commit(checkpointId);
    } catch (Exception e) {
        throw new RuntimeException("Commit failed", e);
    }
}, executor);

commitFuture.whenComplete((result, throwable) -> {
    if (throwable == null) {
        // Acknowledge must be in same thread as commit
        backend.ackCommit(checkpointId, System.currentTimeMillis());
        System.out.println("Async commit completed");
    } else {
        backend.rollBack(checkpointId);
        System.err.println("Async commit failed: " + throwable.getMessage());
    }
});

// Example 3: Multiple state operations in single transaction
backend.setCurrentKey("user123");
long txnId = 2001L;

// Modify multiple states
ValueState<String> nameState = backend.getValueState(nameDesc);
ListState<String> eventState = backend.getListState(eventDesc);
MapState<String, Integer> counterState = backend.getMapState(counterDesc);

nameState.update("Alice");
eventState.add("login");
counterState.put("sessions", 1);

// Commit all changes atomically
try {
    backend.setCheckpointId(txnId);
    backend.finish(txnId);
    backend.commit(txnId);
    backend.ackCommit(txnId, System.currentTimeMillis());
    
    System.out.println("All state changes committed atomically");
} catch (Exception e) {
    backend.rollBack(txnId);
    System.err.println("All state changes rolled back due to failure");
}

Error Handling and Recovery

The transaction system provides comprehensive error handling and recovery mechanisms:

// Custom exception handling with detailed recovery
public class StreamingStateTransactionManager {
    private final KeyStateBackend backend;
    private final Map<Long, TransactionState> activeTransactions = new ConcurrentHashMap<>();
    
    public void performCheckpoint(long checkpointId) {
        TransactionState txnState = new TransactionState(checkpointId);
        activeTransactions.put(checkpointId, txnState);
        
        try {
            // Phase 1: Finish
            txnState.setPhase("FINISH");
            backend.finish(checkpointId);
            
            // Phase 2: Commit
            txnState.setPhase("COMMIT");
            backend.commit(checkpointId);
            
            // Phase 3: Acknowledge
            txnState.setPhase("ACK_COMMIT");
            backend.ackCommit(checkpointId, System.currentTimeMillis());
            
            txnState.setPhase("COMPLETED");
            System.out.println("Transaction " + checkpointId + " completed successfully");
            
        } catch (Exception e) {
            handleTransactionFailure(checkpointId, txnState, e);
        } finally {
            activeTransactions.remove(checkpointId);
        }
    }
    
    private void handleTransactionFailure(long checkpointId, TransactionState txnState, Exception error) {
        System.err.println("Transaction " + checkpointId + " failed in phase " + txnState.getPhase() + ": " + error.getMessage());
        
        try {
            backend.rollBack(checkpointId);
            System.out.println("Successfully rolled back transaction " + checkpointId);
        } catch (Exception rollbackError) {
            System.err.println("CRITICAL: Rollback failed for transaction " + checkpointId + ": " + rollbackError.getMessage());
            // Additional recovery logic would go here
        }
    }
    
    private static class TransactionState {
        private final long checkpointId;
        private String phase;
        private final long startTime;
        
        public TransactionState(long checkpointId) {
            this.checkpointId = checkpointId;
            this.startTime = System.currentTimeMillis();
            this.phase = "INIT";
        }
        
        public void setPhase(String phase) { this.phase = phase; }
        public String getPhase() { return phase; }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-ray--streaming-state

docs

backend-management.md

configuration-key-groups.md

index.md

key-state-management.md

serialization-framework.md

state-types-operations.md

transaction-management.md

tile.json