CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-wso2-siddhi--siddhi-core

Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.

Overview
Eval results
Files

persistence.mddocs/

Persistence

State management and persistence capabilities provide fault tolerance and recovery scenarios for Siddhi applications. This includes interfaces for state persistence, snapshot management, and incremental persistence for high-performance scenarios.

Persistence Interfaces

PersistenceStore

Interface for state persistence providing basic save/load operations for Siddhi application state.

public interface PersistenceStore {
    // Core Persistence Operations
    void save(String siddhiAppName, String revision, byte[] snapshot);
    byte[] load(String siddhiAppName, String revision);
    
    // Revision Management
    String getLastRevision(String siddhiAppName);
    void clearRevision(String siddhiAppName, String revision);
    void clearAllRevisions(String siddhiAppName);
    
    // Store Management
    void setStatePersistenceConfigs(StatePersistenceConfig statePersistenceConfig);
}

IncrementalPersistenceStore

Interface for incremental state persistence, optimized for high-performance scenarios with selective state updates.

public interface IncrementalPersistenceStore {
    // Incremental Operations
    void save(IncrementalSnapshotInfo snapshotInfo, byte[] snapshot);
    byte[] load(IncrementalSnapshotInfo snapshotInfo);
    
    // Configuration Management
    void setProperties(Map properties);
    
    // Revision Management
    List<IncrementalSnapshotInfo> getListOfRevisionsToLoad(long restoreTime, String siddhiAppName);
    String getLastRevision(String siddhiAppId);
    void clearAllRevisions(String siddhiAppId);
}

PersistenceReference

Reference to persisted state providing metadata about persistence operations.

public interface PersistenceReference {
    String getRevision();
    long getTimestamp();
    String getSiddhiAppName();
}

Snapshotable

Interface for snapshot capability, enabling objects to provide their state for persistence.

public interface Snapshotable {
    byte[] getSnapshot();
    void restoreSnapshot(byte[] snapshot);
}

SiddhiAppRuntime Persistence Operations

Basic Persistence Operations

public class SiddhiAppRuntime {
    // State Management
    public PersistenceReference persist();
    public byte[] snapshot();
    public void restore(byte[] snapshot);
    public void restoreRevision(String revision);
    public void restoreLastRevision();
    public void clearAllRevisions();
}

Usage Examples

// Basic persistence workflow
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
runtime.start();

// Process events for some time...
InputHandler handler = runtime.getInputHandler("StockStream");
handler.send(new Object[]{"IBM", 150.0, 1000L});

// Persist current state
PersistenceReference ref = runtime.persist();
System.out.println("Persisted state with revision: " + ref.getRevision());

// Continue processing...
handler.send(new Object[]{"MSFT", 120.0, 500L});

// Take a snapshot
byte[] snapshot = runtime.snapshot();
saveSnapshotToFile(snapshot);

// Simulate restart - restore from snapshot
runtime.shutdown();
runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
runtime.restore(snapshot);
runtime.start();

// Or restore from specific revision
runtime.restoreRevision(ref.getRevision());

SiddhiManager Global Persistence

Manager-Level Persistence

public class SiddhiManager {
    // Global Persistence Operations
    public void persist();
    public void restoreLastState();
    public String getLastRevision(String siddhiAppName);
    
    // Persistence Store Configuration
    public void setPersistenceStore(PersistenceStore persistenceStore);
    public void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore);
}

Usage Examples

// Configure persistence store
FilePersistenceStore persistenceStore = new FilePersistenceStore("./siddhi-state");
siddhiManager.setPersistenceStore(persistenceStore);

// Create and configure multiple applications
SiddhiAppRuntime app1 = siddhiManager.createSiddhiAppRuntime(tradingApp);
SiddhiAppRuntime app2 = siddhiManager.createSiddhiAppRuntime(alertingApp);

app1.start();
app2.start();

// Persist all applications at once
siddhiManager.persist();

// Simulate system restart
siddhiManager.shutdown();
siddhiManager = new SiddhiManager();
siddhiManager.setPersistenceStore(persistenceStore);

// Restore all applications
siddhiManager.restoreLastState();

// Get specific revision information
String lastRevision = siddhiManager.getLastRevision("TradingApp");
System.out.println("Last revision for TradingApp: " + lastRevision);

Persistence Store Implementations

File-Based Persistence

// Example file-based persistence store usage
public class FilePersistenceStore implements PersistenceStore {
    private final String baseDirectory;
    
    public FilePersistenceStore(String baseDirectory) {
        this.baseDirectory = baseDirectory;
        // Ensure directory exists
        new File(baseDirectory).mkdirs();
    }
    
    @Override
    public void save(String siddhiAppName, String revision, byte[] snapshot) {
        try {
            String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";
            Files.write(Paths.get(filename), snapshot);
        } catch (IOException e) {
            throw new PersistenceStoreException("Failed to save snapshot", e);
        }
    }
    
    @Override
    public byte[] load(String siddhiAppName, String revision) {
        try {
            String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";
            return Files.readAllBytes(Paths.get(filename));
        } catch (IOException e) {
            throw new PersistenceStoreException("Failed to load snapshot", e);
        }
    }
}

// Usage
FilePersistenceStore fileStore = new FilePersistenceStore("./siddhi-persistence");
siddhiManager.setPersistenceStore(fileStore);

Database-Based Persistence

// Example database persistence configuration
public class DatabasePersistenceStore implements PersistenceStore {
    private final DataSource dataSource;
    
    public DatabasePersistenceStore(DataSource dataSource) {
        this.dataSource = dataSource;
        initializeTables();
    }
    
    @Override
    public void save(String siddhiAppName, String revision, byte[] snapshot) {
        String sql = "INSERT INTO siddhi_snapshots (app_name, revision, snapshot_data, created_at) " +
                    "VALUES (?, ?, ?, ?)";
        
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            
            stmt.setString(1, siddhiAppName);
            stmt.setString(2, revision);
            stmt.setBytes(3, snapshot);
            stmt.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
            
            stmt.executeUpdate();
        } catch (SQLException e) {
            throw new PersistenceStoreException("Failed to save to database", e);
        }
    }
}

Incremental Persistence

High-Performance Incremental Persistence

// Configure incremental persistence for high-throughput scenarios
public class RedisIncrementalStore implements IncrementalPersistenceStore {
    private final RedisTemplate<String, byte[]> redisTemplate;
    
    @Override
    public void save(StatePersistenceConfig config, String siddhiAppName, byte[] configSnapshot) {
        String key = "siddhi:incremental:" + siddhiAppName;
        redisTemplate.opsForValue().set(key, configSnapshot);
        
        // Set TTL based on configuration
        redisTemplate.expire(key, config.getRetentionDuration(), TimeUnit.SECONDS);
    }
    
    @Override
    public byte[] load(StatePersistenceConfig config, String siddhiAppName) {
        String key = "siddhi:incremental:" + siddhiAppName;
        return redisTemplate.opsForValue().get(key);
    }
}

// Usage with incremental persistence
RedisIncrementalStore incrementalStore = new RedisIncrementalStore(redisTemplate);
siddhiManager.setIncrementalPersistenceStore(incrementalStore);

Advanced Persistence Patterns

Scheduled Persistence

// Automated periodic persistence
public class ScheduledPersistenceManager {
    private final SiddhiManager siddhiManager;
    private final ScheduledExecutorService scheduler;
    
    public ScheduledPersistenceManager(SiddhiManager siddhiManager) {
        this.siddhiManager = siddhiManager;
        this.scheduler = Executors.newScheduledThreadPool(1);
    }
    
    public void startPeriodicPersistence(long intervalMinutes) {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                System.out.println("Starting scheduled persistence...");
                siddhiManager.persist();
                System.out.println("Scheduled persistence completed");
            } catch (Exception e) {
                System.err.println("Scheduled persistence failed: " + e.getMessage());
            }
        }, intervalMinutes, intervalMinutes, TimeUnit.MINUTES);
    }
    
    public void shutdown() {
        scheduler.shutdown();
    }
}

// Usage
ScheduledPersistenceManager persistenceManager = 
    new ScheduledPersistenceManager(siddhiManager);
persistenceManager.startPeriodicPersistence(15); // Every 15 minutes

Conditional Persistence

// Event-driven persistence based on conditions
public class ConditionalPersistenceHandler extends StreamCallback {
    private final SiddhiAppRuntime runtime;
    private final AtomicLong eventCount = new AtomicLong(0);
    private final long persistenceThreshold = 10000;
    
    @Override
    public void receive(Event[] events) {
        long count = eventCount.addAndGet(events.length);
        
        // Persist state after processing threshold number of events
        if (count >= persistenceThreshold) {
            try {
                PersistenceReference ref = runtime.persist();
                System.out.println("Auto-persisted after " + count + " events, revision: " + 
                                 ref.getRevision());
                eventCount.set(0); // Reset counter
            } catch (Exception e) {
                System.err.println("Auto-persistence failed: " + e.getMessage());
            }
        }
    }
}

Clustered Persistence

// Distributed persistence for cluster environments
public class ClusteredPersistenceCoordinator {
    private final SiddhiManager siddhiManager;
    private final ClusterCoordinator coordinator;
    
    public void coordinatedPersistence() {
        // Only leader node initiates persistence
        if (coordinator.isLeader()) {
            // Coordinate persistence across cluster
            coordinator.broadcast("PREPARE_PERSISTENCE");
            
            // Wait for all nodes to be ready
            coordinator.waitForAllNodesReady();
            
            // Execute persistence
            siddhiManager.persist();
            
            // Notify completion
            coordinator.broadcast("PERSISTENCE_COMPLETE");
        }
    }
}

Error Handling and Recovery

Persistence Exception Handling

public class RobustPersistenceManager {
    private final SiddhiAppRuntime runtime;
    private final List<PersistenceStore> backupStores;
    
    public void safePersist() {
        Exception lastException = null;
        
        // Try primary store first
        try {
            runtime.persist();
            return;
        } catch (PersistenceStoreException e) {
            lastException = e;
            System.err.println("Primary persistence failed: " + e.getMessage());
        }
        
        // Try backup stores
        for (PersistenceStore backupStore : backupStores) {
            try {
                // Switch to backup store and retry
                runtime.setPersistenceStore(backupStore);
                runtime.persist();
                System.out.println("Successfully persisted to backup store");
                return;
            } catch (Exception e) {
                lastException = e;
                System.err.println("Backup persistence failed: " + e.getMessage());
            }
        }
        
        // All stores failed
        throw new PersistenceStoreException("All persistence stores failed", lastException);
    }
}

Types

public interface StatePersistenceConfig {
    long getRetentionDuration();
    String getStorageLocation();
    Map<String, String> getProperties();
}

public interface IncrementalSnapshotInfo {
    String getId();
    String getSiddhiAppId();
    String getType();
    String getQueryName();
    String getElementId();
    long getTime();
    Map<String, Object> getPartitionKeyGroupMap();
}

public class PersistenceStoreException extends SiddhiException {
    public PersistenceStoreException(String message);
    public PersistenceStoreException(String message, Throwable cause);
}

public class CannotRestoreSiddhiAppStateException extends SiddhiException {
    public CannotRestoreSiddhiAppStateException(String message);
    public CannotRestoreSiddhiAppStateException(String message, Throwable cause);
}

public class CannotClearSiddhiAppStateException extends SiddhiException {
    public CannotClearSiddhiAppStateException(String message);
    public CannotClearSiddhiAppStateException(String message, Throwable cause);
}

public class NoPersistenceStoreException extends SiddhiException {
    public NoPersistenceStoreException(String message);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core

docs

aggregations.md

core-management.md

event-handling.md

exceptions.md

extensions.md

index.md

persistence.md

queries-and-callbacks.md

statistics.md

tile.json