State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications
—
Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.
Core transaction interface defining the four-phase commit protocol for state management with checkpoint-based recovery support.
/**
* Transaction state interface supporting four-phase commit protocol
*/
public interface StateStoreManager {
/**
* Finish phase - Complete batch data saving and serialization
* This is typically where serialization work is performed
* @param checkpointId Checkpoint identifier for this transaction
*/
void finish(long checkpointId);
/**
* Commit phase - Persist data to storage (can be async)
* This is typically where data persistence is performed
* @param checkpointId Checkpoint identifier for this transaction
*/
void commit(long checkpointId);
/**
* Acknowledge commit phase - Clean up after successful commit
* Must be called after commit in the same thread
* @param checkpointId Checkpoint identifier for this transaction
* @param timeStamp Timestamp of the acknowledgment
*/
void ackCommit(long checkpointId, long timeStamp);
/**
* Rollback phase - Recover from checkpoint on failure
* @param checkpointId Checkpoint identifier to rollback to
*/
void rollBack(long checkpointId);
}Usage Examples:
// Example transaction flow for successful checkpoint
StateStoreManager stateManager = keyStateBackend; // KeyStateBackend implements StateStoreManager
long checkpointId = 1001L;
try {
// Phase 1: Finish - serialize and prepare data
stateManager.finish(checkpointId);
// Phase 2: Commit - persist data (can be in separate thread)
stateManager.commit(checkpointId);
// Phase 3: Acknowledge - cleanup after successful commit
long timestamp = System.currentTimeMillis();
stateManager.ackCommit(checkpointId, timestamp);
System.out.println("Checkpoint " + checkpointId + " completed successfully");
} catch (Exception e) {
// Phase 4: Rollback - recover on failure
stateManager.rollBack(checkpointId);
System.err.println("Checkpoint " + checkpointId + " failed, rolled back");
}Base implementation providing transaction operations and context management for key-based state backends.
/**
* Base class providing transaction support and state management
*/
public abstract class AbstractKeyStateBackend implements StateStoreManager {
/**
* Finish checkpoint - complete batch data saving and serialization
* @param checkpointId Checkpoint identifier
*/
public void finish(long checkpointId);
/**
* Commit checkpoint - persist data (can be async)
* @param checkpointId Checkpoint identifier
*/
public void commit(long checkpointId);
/**
* Acknowledge commit - clean up after commit
* @param checkpointId Checkpoint identifier
* @param timeStamp Timestamp of acknowledgment
*/
public void ackCommit(long checkpointId, long timeStamp);
/**
* Rollback checkpoint - recover from checkpoint
* @param checkpointId Checkpoint identifier
*/
public void rollBack(long checkpointId);
/**
* Get current checkpoint ID
* @return Current checkpoint ID
*/
public long getCheckpointId();
/**
* Set checkpoint ID for transaction context
* @param checkpointId Checkpoint ID to set
*/
public void setCheckpointId(long checkpointId);
/**
* Set complete processing context
* @param checkpointId Checkpoint identifier
* @param currentKey Current processing key
*/
public void setContext(long checkpointId, Object currentKey);
}Abstract proxy class supporting transaction state operations with strategy delegation for different storage backends.
/**
* Proxy supporting transaction state operations with strategy delegation
*/
public abstract class StateStoreManagerProxy<V> implements StateStoreManager {
/**
* Create state store manager proxy
* @param keyStateBackend Backend providing transaction support
* @param stateDescriptor Descriptor defining the state
*/
public StateStoreManagerProxy(AbstractKeyStateBackend keyStateBackend, AbstractStateDescriptor stateDescriptor);
/**
* Finish checkpoint phase
* @param checkpointId Checkpoint identifier
*/
public void finish(long checkpointId);
/**
* Commit checkpoint phase (can be async)
* @param checkpointId Checkpoint identifier
*/
public void commit(long checkpointId);
/**
* Acknowledge commit phase
* @param checkpointId Checkpoint identifier
* @param timeStamp Timestamp of acknowledgment
*/
public void ackCommit(long checkpointId, long timeStamp);
/**
* Rollback checkpoint phase
* @param checkpointId Checkpoint identifier
*/
public void rollBack(long checkpointId);
/**
* Close proxy and cleanup resources
*/
public void close();
/**
* Get value by key
* @param key State key
* @return Retrieved value
*/
public V get(String key);
/**
* Put value by key
* @param key State key
* @param value Value to store
*/
public void put(String key, V value);
}Abstract base class for state store managers implementing three-layer storage architecture (front, middle, remote) with serialization support.
/**
* Abstract base for state store managers with three-layer storage
*/
public abstract class AbstractStateStoreManager<V> {
/**
* Create state store manager with backend storage
* @param backStore Backend key-value store for persistence
*/
public AbstractStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> backStore);
/**
* Convert storage record to bytes for serialization
* @param storageRecord Record to serialize
* @return Serialized bytes
*/
public byte[] toBytes(StorageRecord storageRecord);
/**
* Convert bytes back to storage record
* @param data Serialized bytes
* @return Deserialized storage record
*/
public StorageRecord<V> toStorageRecord(byte[] data);
/**
* Get value for checkpoint and key (abstract)
* @param checkpointId Checkpoint identifier
* @param key State key
* @return Retrieved value
*/
public abstract V get(long checkpointId, String key);
/**
* Put value for checkpoint and key
* @param checkpointId Checkpoint identifier
* @param k State key
* @param v Value to store
*/
public void put(long checkpointId, String k, V v);
/**
* Acknowledge commit with timestamp
* @param checkpointId Checkpoint identifier
* @param timeStamp Acknowledgment timestamp
*/
public void ackCommit(long checkpointId, long timeStamp);
/**
* Acknowledge commit (abstract)
* @param checkpointId Checkpoint identifier
*/
public abstract void ackCommit(long checkpointId);
/**
* Set key group index for partitioning
* @param keyGroupIndex Key group index
*/
public void setKeyGroupIndex(int keyGroupIndex);
/**
* Close and cleanup resources
*/
public void close();
}The library provides two main storage strategies for different consistency and performance requirements.
/**
* Dual-version state store manager supporting rollback
*/
public class DualStateStoreManager<V> extends AbstractStateStoreManager<V> {
// Maintains two versions of data for rollback support
// Implements DUAL_VERSION strategy
}/**
* Multi-version state store manager for MVCC scenarios
*/
public class MVStateStoreManager<V> extends AbstractStateStoreManager<V> {
// Optimized for MVCC storage systems
// Implements SINGLE_VERSION strategy
}Transaction Flow Examples:
// Example 1: Basic transaction with error handling
KeyStateBackend backend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);
// Set up state
ValueStateDescriptor<String> desc = ValueStateDescriptor.build("test-state", String.class, "");
ValueState<String> state = backend.getValueState(desc);
backend.setCurrentKey("key1");
state.update("value1");
// Perform transaction
long checkpointId = System.currentTimeMillis();
try {
backend.setCheckpointId(checkpointId);
backend.finish(checkpointId);
backend.commit(checkpointId);
backend.ackCommit(checkpointId, System.currentTimeMillis());
System.out.println("Transaction completed successfully");
} catch (Exception e) {
backend.rollBack(checkpointId);
System.err.println("Transaction failed, rolled back: " + e.getMessage());
}
// Example 2: Async commit pattern
ExecutorService executor = Executors.newSingleThreadExecutor();
backend.finish(checkpointId);
// Commit can be performed asynchronously
CompletableFuture<Void> commitFuture = CompletableFuture.runAsync(() -> {
try {
backend.commit(checkpointId);
} catch (Exception e) {
throw new RuntimeException("Commit failed", e);
}
}, executor);
commitFuture.whenComplete((result, throwable) -> {
if (throwable == null) {
// Acknowledge must be in same thread as commit
backend.ackCommit(checkpointId, System.currentTimeMillis());
System.out.println("Async commit completed");
} else {
backend.rollBack(checkpointId);
System.err.println("Async commit failed: " + throwable.getMessage());
}
});
// Example 3: Multiple state operations in single transaction
backend.setCurrentKey("user123");
long txnId = 2001L;
// Modify multiple states
ValueState<String> nameState = backend.getValueState(nameDesc);
ListState<String> eventState = backend.getListState(eventDesc);
MapState<String, Integer> counterState = backend.getMapState(counterDesc);
nameState.update("Alice");
eventState.add("login");
counterState.put("sessions", 1);
// Commit all changes atomically
try {
backend.setCheckpointId(txnId);
backend.finish(txnId);
backend.commit(txnId);
backend.ackCommit(txnId, System.currentTimeMillis());
System.out.println("All state changes committed atomically");
} catch (Exception e) {
backend.rollBack(txnId);
System.err.println("All state changes rolled back due to failure");
}The transaction system provides comprehensive error handling and recovery mechanisms:
// Custom exception handling with detailed recovery
public class StreamingStateTransactionManager {
private final KeyStateBackend backend;
private final Map<Long, TransactionState> activeTransactions = new ConcurrentHashMap<>();
public void performCheckpoint(long checkpointId) {
TransactionState txnState = new TransactionState(checkpointId);
activeTransactions.put(checkpointId, txnState);
try {
// Phase 1: Finish
txnState.setPhase("FINISH");
backend.finish(checkpointId);
// Phase 2: Commit
txnState.setPhase("COMMIT");
backend.commit(checkpointId);
// Phase 3: Acknowledge
txnState.setPhase("ACK_COMMIT");
backend.ackCommit(checkpointId, System.currentTimeMillis());
txnState.setPhase("COMPLETED");
System.out.println("Transaction " + checkpointId + " completed successfully");
} catch (Exception e) {
handleTransactionFailure(checkpointId, txnState, e);
} finally {
activeTransactions.remove(checkpointId);
}
}
private void handleTransactionFailure(long checkpointId, TransactionState txnState, Exception error) {
System.err.println("Transaction " + checkpointId + " failed in phase " + txnState.getPhase() + ": " + error.getMessage());
try {
backend.rollBack(checkpointId);
System.out.println("Successfully rolled back transaction " + checkpointId);
} catch (Exception rollbackError) {
System.err.println("CRITICAL: Rollback failed for transaction " + checkpointId + ": " + rollbackError.getMessage());
// Additional recovery logic would go here
}
}
private static class TransactionState {
private final long checkpointId;
private String phase;
private final long startTime;
public TransactionState(long checkpointId) {
this.checkpointId = checkpointId;
this.startTime = System.currentTimeMillis();
this.phase = "INIT";
}
public void setPhase(String phase) { this.phase = phase; }
public String getPhase() { return phase; }
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ray--streaming-state