Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
State management and persistence capabilities provide fault tolerance and recovery scenarios for Siddhi applications. This includes interfaces for state persistence, snapshot management, and incremental persistence for high-performance scenarios.
Interface for state persistence providing basic save/load operations for Siddhi application state.
public interface PersistenceStore {
// Core Persistence Operations
void save(String siddhiAppName, String revision, byte[] snapshot);
byte[] load(String siddhiAppName, String revision);
// Revision Management
String getLastRevision(String siddhiAppName);
void clearRevision(String siddhiAppName, String revision);
void clearAllRevisions(String siddhiAppName);
// Store Management
void setStatePersistenceConfigs(StatePersistenceConfig statePersistenceConfig);
}Interface for incremental state persistence, optimized for high-performance scenarios with selective state updates.
public interface IncrementalPersistenceStore {
// Incremental Operations
void save(IncrementalSnapshotInfo snapshotInfo, byte[] snapshot);
byte[] load(IncrementalSnapshotInfo snapshotInfo);
// Configuration Management
void setProperties(Map properties);
// Revision Management
List<IncrementalSnapshotInfo> getListOfRevisionsToLoad(long restoreTime, String siddhiAppName);
String getLastRevision(String siddhiAppId);
void clearAllRevisions(String siddhiAppId);
}Reference to persisted state providing metadata about persistence operations.
public interface PersistenceReference {
String getRevision();
long getTimestamp();
String getSiddhiAppName();
}Interface for snapshot capability, enabling objects to provide their state for persistence.
public interface Snapshotable {
byte[] getSnapshot();
void restoreSnapshot(byte[] snapshot);
}public class SiddhiAppRuntime {
// State Management
public PersistenceReference persist();
public byte[] snapshot();
public void restore(byte[] snapshot);
public void restoreRevision(String revision);
public void restoreLastRevision();
public void clearAllRevisions();
}// Basic persistence workflow
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
runtime.start();
// Process events for some time...
InputHandler handler = runtime.getInputHandler("StockStream");
handler.send(new Object[]{"IBM", 150.0, 1000L});
// Persist current state
PersistenceReference ref = runtime.persist();
System.out.println("Persisted state with revision: " + ref.getRevision());
// Continue processing...
handler.send(new Object[]{"MSFT", 120.0, 500L});
// Take a snapshot
byte[] snapshot = runtime.snapshot();
saveSnapshotToFile(snapshot);
// Simulate restart - restore from snapshot
runtime.shutdown();
runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
runtime.restore(snapshot);
runtime.start();
// Or restore from specific revision
runtime.restoreRevision(ref.getRevision());public class SiddhiManager {
// Global Persistence Operations
public void persist();
public void restoreLastState();
public String getLastRevision(String siddhiAppName);
// Persistence Store Configuration
public void setPersistenceStore(PersistenceStore persistenceStore);
public void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore);
}// Configure persistence store
FilePersistenceStore persistenceStore = new FilePersistenceStore("./siddhi-state");
siddhiManager.setPersistenceStore(persistenceStore);
// Create and configure multiple applications
SiddhiAppRuntime app1 = siddhiManager.createSiddhiAppRuntime(tradingApp);
SiddhiAppRuntime app2 = siddhiManager.createSiddhiAppRuntime(alertingApp);
app1.start();
app2.start();
// Persist all applications at once
siddhiManager.persist();
// Simulate system restart
siddhiManager.shutdown();
siddhiManager = new SiddhiManager();
siddhiManager.setPersistenceStore(persistenceStore);
// Restore all applications
siddhiManager.restoreLastState();
// Get specific revision information
String lastRevision = siddhiManager.getLastRevision("TradingApp");
System.out.println("Last revision for TradingApp: " + lastRevision);// Example file-based persistence store usage
public class FilePersistenceStore implements PersistenceStore {
private final String baseDirectory;
public FilePersistenceStore(String baseDirectory) {
this.baseDirectory = baseDirectory;
// Ensure directory exists
new File(baseDirectory).mkdirs();
}
@Override
public void save(String siddhiAppName, String revision, byte[] snapshot) {
try {
String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";
Files.write(Paths.get(filename), snapshot);
} catch (IOException e) {
throw new PersistenceStoreException("Failed to save snapshot", e);
}
}
@Override
public byte[] load(String siddhiAppName, String revision) {
try {
String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";
return Files.readAllBytes(Paths.get(filename));
} catch (IOException e) {
throw new PersistenceStoreException("Failed to load snapshot", e);
}
}
}
// Usage
FilePersistenceStore fileStore = new FilePersistenceStore("./siddhi-persistence");
siddhiManager.setPersistenceStore(fileStore);// Example database persistence configuration
public class DatabasePersistenceStore implements PersistenceStore {
private final DataSource dataSource;
public DatabasePersistenceStore(DataSource dataSource) {
this.dataSource = dataSource;
initializeTables();
}
@Override
public void save(String siddhiAppName, String revision, byte[] snapshot) {
String sql = "INSERT INTO siddhi_snapshots (app_name, revision, snapshot_data, created_at) " +
"VALUES (?, ?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, siddhiAppName);
stmt.setString(2, revision);
stmt.setBytes(3, snapshot);
stmt.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
stmt.executeUpdate();
} catch (SQLException e) {
throw new PersistenceStoreException("Failed to save to database", e);
}
}
}// Configure incremental persistence for high-throughput scenarios
public class RedisIncrementalStore implements IncrementalPersistenceStore {
private final RedisTemplate<String, byte[]> redisTemplate;
@Override
public void save(StatePersistenceConfig config, String siddhiAppName, byte[] configSnapshot) {
String key = "siddhi:incremental:" + siddhiAppName;
redisTemplate.opsForValue().set(key, configSnapshot);
// Set TTL based on configuration
redisTemplate.expire(key, config.getRetentionDuration(), TimeUnit.SECONDS);
}
@Override
public byte[] load(StatePersistenceConfig config, String siddhiAppName) {
String key = "siddhi:incremental:" + siddhiAppName;
return redisTemplate.opsForValue().get(key);
}
}
// Usage with incremental persistence
RedisIncrementalStore incrementalStore = new RedisIncrementalStore(redisTemplate);
siddhiManager.setIncrementalPersistenceStore(incrementalStore);// Automated periodic persistence
public class ScheduledPersistenceManager {
private final SiddhiManager siddhiManager;
private final ScheduledExecutorService scheduler;
public ScheduledPersistenceManager(SiddhiManager siddhiManager) {
this.siddhiManager = siddhiManager;
this.scheduler = Executors.newScheduledThreadPool(1);
}
public void startPeriodicPersistence(long intervalMinutes) {
scheduler.scheduleAtFixedRate(() -> {
try {
System.out.println("Starting scheduled persistence...");
siddhiManager.persist();
System.out.println("Scheduled persistence completed");
} catch (Exception e) {
System.err.println("Scheduled persistence failed: " + e.getMessage());
}
}, intervalMinutes, intervalMinutes, TimeUnit.MINUTES);
}
public void shutdown() {
scheduler.shutdown();
}
}
// Usage
ScheduledPersistenceManager persistenceManager =
new ScheduledPersistenceManager(siddhiManager);
persistenceManager.startPeriodicPersistence(15); // Every 15 minutes// Event-driven persistence based on conditions
public class ConditionalPersistenceHandler extends StreamCallback {
private final SiddhiAppRuntime runtime;
private final AtomicLong eventCount = new AtomicLong(0);
private final long persistenceThreshold = 10000;
@Override
public void receive(Event[] events) {
long count = eventCount.addAndGet(events.length);
// Persist state after processing threshold number of events
if (count >= persistenceThreshold) {
try {
PersistenceReference ref = runtime.persist();
System.out.println("Auto-persisted after " + count + " events, revision: " +
ref.getRevision());
eventCount.set(0); // Reset counter
} catch (Exception e) {
System.err.println("Auto-persistence failed: " + e.getMessage());
}
}
}
}// Distributed persistence for cluster environments
public class ClusteredPersistenceCoordinator {
private final SiddhiManager siddhiManager;
private final ClusterCoordinator coordinator;
public void coordinatedPersistence() {
// Only leader node initiates persistence
if (coordinator.isLeader()) {
// Coordinate persistence across cluster
coordinator.broadcast("PREPARE_PERSISTENCE");
// Wait for all nodes to be ready
coordinator.waitForAllNodesReady();
// Execute persistence
siddhiManager.persist();
// Notify completion
coordinator.broadcast("PERSISTENCE_COMPLETE");
}
}
}public class RobustPersistenceManager {
private final SiddhiAppRuntime runtime;
private final List<PersistenceStore> backupStores;
public void safePersist() {
Exception lastException = null;
// Try primary store first
try {
runtime.persist();
return;
} catch (PersistenceStoreException e) {
lastException = e;
System.err.println("Primary persistence failed: " + e.getMessage());
}
// Try backup stores
for (PersistenceStore backupStore : backupStores) {
try {
// Switch to backup store and retry
runtime.setPersistenceStore(backupStore);
runtime.persist();
System.out.println("Successfully persisted to backup store");
return;
} catch (Exception e) {
lastException = e;
System.err.println("Backup persistence failed: " + e.getMessage());
}
}
// All stores failed
throw new PersistenceStoreException("All persistence stores failed", lastException);
}
}public interface StatePersistenceConfig {
long getRetentionDuration();
String getStorageLocation();
Map<String, String> getProperties();
}
public interface IncrementalSnapshotInfo {
String getId();
String getSiddhiAppId();
String getType();
String getQueryName();
String getElementId();
long getTime();
Map<String, Object> getPartitionKeyGroupMap();
}
public class PersistenceStoreException extends SiddhiException {
public PersistenceStoreException(String message);
public PersistenceStoreException(String message, Throwable cause);
}
public class CannotRestoreSiddhiAppStateException extends SiddhiException {
public CannotRestoreSiddhiAppStateException(String message);
public CannotRestoreSiddhiAppStateException(String message, Throwable cause);
}
public class CannotClearSiddhiAppStateException extends SiddhiException {
public CannotClearSiddhiAppStateException(String message);
public CannotClearSiddhiAppStateException(String message, Throwable cause);
}
public class NoPersistenceStoreException extends SiddhiException {
public NoPersistenceStoreException(String message);
}Install with Tessl CLI
npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core