0
# Persistence
1
2
State management and persistence capabilities provide fault tolerance and recovery scenarios for Siddhi applications. This includes interfaces for state persistence, snapshot management, and incremental persistence for high-performance scenarios.
3
4
## Persistence Interfaces
5
6
### PersistenceStore
7
8
Interface for state persistence providing basic save/load operations for Siddhi application state.
9
10
```java { .api }
11
public interface PersistenceStore {
12
// Core Persistence Operations
13
void save(String siddhiAppName, String revision, byte[] snapshot);
14
byte[] load(String siddhiAppName, String revision);
15
16
// Revision Management
17
String getLastRevision(String siddhiAppName);
18
void clearRevision(String siddhiAppName, String revision);
19
void clearAllRevisions(String siddhiAppName);
20
21
// Store Management
22
void setStatePersistenceConfigs(StatePersistenceConfig statePersistenceConfig);
23
}
24
```
25
26
### IncrementalPersistenceStore
27
28
Interface for incremental state persistence, optimized for high-performance scenarios with selective state updates.
29
30
```java { .api }
31
public interface IncrementalPersistenceStore {
32
// Incremental Operations
33
void save(IncrementalSnapshotInfo snapshotInfo, byte[] snapshot);
34
byte[] load(IncrementalSnapshotInfo snapshotInfo);
35
36
// Configuration Management
37
void setProperties(Map properties);
38
39
// Revision Management
40
List<IncrementalSnapshotInfo> getListOfRevisionsToLoad(long restoreTime, String siddhiAppName);
41
String getLastRevision(String siddhiAppId);
42
void clearAllRevisions(String siddhiAppId);
43
}
44
```
45
46
### PersistenceReference
47
48
Reference to persisted state providing metadata about persistence operations.
49
50
```java { .api }
51
public interface PersistenceReference {
52
String getRevision();
53
long getTimestamp();
54
String getSiddhiAppName();
55
}
56
```
57
58
### Snapshotable
59
60
Interface for snapshot capability, enabling objects to provide their state for persistence.
61
62
```java { .api }
63
public interface Snapshotable {
64
byte[] getSnapshot();
65
void restoreSnapshot(byte[] snapshot);
66
}
67
```
68
69
## SiddhiAppRuntime Persistence Operations
70
71
### Basic Persistence Operations
72
73
```java { .api }
74
public class SiddhiAppRuntime {
75
// State Management
76
public PersistenceReference persist();
77
public byte[] snapshot();
78
public void restore(byte[] snapshot);
79
public void restoreRevision(String revision);
80
public void restoreLastRevision();
81
public void clearAllRevisions();
82
}
83
```
84
85
### Usage Examples
86
87
```java
88
// Basic persistence workflow
89
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
90
runtime.start();
91
92
// Process events for some time...
93
InputHandler handler = runtime.getInputHandler("StockStream");
94
handler.send(new Object[]{"IBM", 150.0, 1000L});
95
96
// Persist current state
97
PersistenceReference ref = runtime.persist();
98
System.out.println("Persisted state with revision: " + ref.getRevision());
99
100
// Continue processing...
101
handler.send(new Object[]{"MSFT", 120.0, 500L});
102
103
// Take a snapshot
104
byte[] snapshot = runtime.snapshot();
105
saveSnapshotToFile(snapshot);
106
107
// Simulate restart - restore from snapshot
108
runtime.shutdown();
109
runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
110
runtime.restore(snapshot);
111
runtime.start();
112
113
// Or restore from specific revision
114
runtime.restoreRevision(ref.getRevision());
115
```
116
117
## SiddhiManager Global Persistence
118
119
### Manager-Level Persistence
120
121
```java { .api }
122
public class SiddhiManager {
123
// Global Persistence Operations
124
public void persist();
125
public void restoreLastState();
126
public String getLastRevision(String siddhiAppName);
127
128
// Persistence Store Configuration
129
public void setPersistenceStore(PersistenceStore persistenceStore);
130
public void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore);
131
}
132
```
133
134
### Usage Examples
135
136
```java
137
// Configure persistence store
138
FilePersistenceStore persistenceStore = new FilePersistenceStore("./siddhi-state");
139
siddhiManager.setPersistenceStore(persistenceStore);
140
141
// Create and configure multiple applications
142
SiddhiAppRuntime app1 = siddhiManager.createSiddhiAppRuntime(tradingApp);
143
SiddhiAppRuntime app2 = siddhiManager.createSiddhiAppRuntime(alertingApp);
144
145
app1.start();
146
app2.start();
147
148
// Persist all applications at once
149
siddhiManager.persist();
150
151
// Simulate system restart
152
siddhiManager.shutdown();
153
siddhiManager = new SiddhiManager();
154
siddhiManager.setPersistenceStore(persistenceStore);
155
156
// Restore all applications
157
siddhiManager.restoreLastState();
158
159
// Get specific revision information
160
String lastRevision = siddhiManager.getLastRevision("TradingApp");
161
System.out.println("Last revision for TradingApp: " + lastRevision);
162
```
163
164
## Persistence Store Implementations
165
166
### File-Based Persistence
167
168
```java
169
// Example file-based persistence store usage
170
public class FilePersistenceStore implements PersistenceStore {
171
private final String baseDirectory;
172
173
public FilePersistenceStore(String baseDirectory) {
174
this.baseDirectory = baseDirectory;
175
// Ensure directory exists
176
new File(baseDirectory).mkdirs();
177
}
178
179
@Override
180
public void save(String siddhiAppName, String revision, byte[] snapshot) {
181
try {
182
String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";
183
Files.write(Paths.get(filename), snapshot);
184
} catch (IOException e) {
185
throw new PersistenceStoreException("Failed to save snapshot", e);
186
}
187
}
188
189
@Override
190
public byte[] load(String siddhiAppName, String revision) {
191
try {
192
String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";
193
return Files.readAllBytes(Paths.get(filename));
194
} catch (IOException e) {
195
throw new PersistenceStoreException("Failed to load snapshot", e);
196
}
197
}
198
}
199
200
// Usage
201
FilePersistenceStore fileStore = new FilePersistenceStore("./siddhi-persistence");
202
siddhiManager.setPersistenceStore(fileStore);
203
```
204
205
### Database-Based Persistence
206
207
```java
208
// Example database persistence configuration
209
public class DatabasePersistenceStore implements PersistenceStore {
210
private final DataSource dataSource;
211
212
public DatabasePersistenceStore(DataSource dataSource) {
213
this.dataSource = dataSource;
214
initializeTables();
215
}
216
217
@Override
218
public void save(String siddhiAppName, String revision, byte[] snapshot) {
219
String sql = "INSERT INTO siddhi_snapshots (app_name, revision, snapshot_data, created_at) " +
220
"VALUES (?, ?, ?, ?)";
221
222
try (Connection conn = dataSource.getConnection();
223
PreparedStatement stmt = conn.prepareStatement(sql)) {
224
225
stmt.setString(1, siddhiAppName);
226
stmt.setString(2, revision);
227
stmt.setBytes(3, snapshot);
228
stmt.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
229
230
stmt.executeUpdate();
231
} catch (SQLException e) {
232
throw new PersistenceStoreException("Failed to save to database", e);
233
}
234
}
235
}
236
```
237
238
## Incremental Persistence
239
240
### High-Performance Incremental Persistence
241
242
```java
243
// Configure incremental persistence for high-throughput scenarios
244
public class RedisIncrementalStore implements IncrementalPersistenceStore {
245
private final RedisTemplate<String, byte[]> redisTemplate;
246
247
@Override
248
public void save(StatePersistenceConfig config, String siddhiAppName, byte[] configSnapshot) {
249
String key = "siddhi:incremental:" + siddhiAppName;
250
redisTemplate.opsForValue().set(key, configSnapshot);
251
252
// Set TTL based on configuration
253
redisTemplate.expire(key, config.getRetentionDuration(), TimeUnit.SECONDS);
254
}
255
256
@Override
257
public byte[] load(StatePersistenceConfig config, String siddhiAppName) {
258
String key = "siddhi:incremental:" + siddhiAppName;
259
return redisTemplate.opsForValue().get(key);
260
}
261
}
262
263
// Usage with incremental persistence
264
RedisIncrementalStore incrementalStore = new RedisIncrementalStore(redisTemplate);
265
siddhiManager.setIncrementalPersistenceStore(incrementalStore);
266
```
267
268
## Advanced Persistence Patterns
269
270
### Scheduled Persistence
271
272
```java
273
// Automated periodic persistence
274
public class ScheduledPersistenceManager {
275
private final SiddhiManager siddhiManager;
276
private final ScheduledExecutorService scheduler;
277
278
public ScheduledPersistenceManager(SiddhiManager siddhiManager) {
279
this.siddhiManager = siddhiManager;
280
this.scheduler = Executors.newScheduledThreadPool(1);
281
}
282
283
public void startPeriodicPersistence(long intervalMinutes) {
284
scheduler.scheduleAtFixedRate(() -> {
285
try {
286
System.out.println("Starting scheduled persistence...");
287
siddhiManager.persist();
288
System.out.println("Scheduled persistence completed");
289
} catch (Exception e) {
290
System.err.println("Scheduled persistence failed: " + e.getMessage());
291
}
292
}, intervalMinutes, intervalMinutes, TimeUnit.MINUTES);
293
}
294
295
public void shutdown() {
296
scheduler.shutdown();
297
}
298
}
299
300
// Usage
301
ScheduledPersistenceManager persistenceManager =
302
new ScheduledPersistenceManager(siddhiManager);
303
persistenceManager.startPeriodicPersistence(15); // Every 15 minutes
304
```
305
306
### Conditional Persistence
307
308
```java
309
// Event-driven persistence based on conditions
310
public class ConditionalPersistenceHandler extends StreamCallback {
311
private final SiddhiAppRuntime runtime;
312
private final AtomicLong eventCount = new AtomicLong(0);
313
private final long persistenceThreshold = 10000;
314
315
@Override
316
public void receive(Event[] events) {
317
long count = eventCount.addAndGet(events.length);
318
319
// Persist state after processing threshold number of events
320
if (count >= persistenceThreshold) {
321
try {
322
PersistenceReference ref = runtime.persist();
323
System.out.println("Auto-persisted after " + count + " events, revision: " +
324
ref.getRevision());
325
eventCount.set(0); // Reset counter
326
} catch (Exception e) {
327
System.err.println("Auto-persistence failed: " + e.getMessage());
328
}
329
}
330
}
331
}
332
```
333
334
### Clustered Persistence
335
336
```java
337
// Distributed persistence for cluster environments
338
public class ClusteredPersistenceCoordinator {
339
private final SiddhiManager siddhiManager;
340
private final ClusterCoordinator coordinator;
341
342
public void coordinatedPersistence() {
343
// Only leader node initiates persistence
344
if (coordinator.isLeader()) {
345
// Coordinate persistence across cluster
346
coordinator.broadcast("PREPARE_PERSISTENCE");
347
348
// Wait for all nodes to be ready
349
coordinator.waitForAllNodesReady();
350
351
// Execute persistence
352
siddhiManager.persist();
353
354
// Notify completion
355
coordinator.broadcast("PERSISTENCE_COMPLETE");
356
}
357
}
358
}
359
```
360
361
## Error Handling and Recovery
362
363
### Persistence Exception Handling
364
365
```java
366
public class RobustPersistenceManager {
367
private final SiddhiAppRuntime runtime;
368
private final List<PersistenceStore> backupStores;
369
370
public void safePersist() {
371
Exception lastException = null;
372
373
// Try primary store first
374
try {
375
runtime.persist();
376
return;
377
} catch (PersistenceStoreException e) {
378
lastException = e;
379
System.err.println("Primary persistence failed: " + e.getMessage());
380
}
381
382
// Try backup stores
383
for (PersistenceStore backupStore : backupStores) {
384
try {
385
// Switch to backup store and retry
386
runtime.setPersistenceStore(backupStore);
387
runtime.persist();
388
System.out.println("Successfully persisted to backup store");
389
return;
390
} catch (Exception e) {
391
lastException = e;
392
System.err.println("Backup persistence failed: " + e.getMessage());
393
}
394
}
395
396
// All stores failed
397
throw new PersistenceStoreException("All persistence stores failed", lastException);
398
}
399
}
400
```
401
402
## Types
403
404
```java { .api }
405
public interface StatePersistenceConfig {
406
long getRetentionDuration();
407
String getStorageLocation();
408
Map<String, String> getProperties();
409
}
410
411
public interface IncrementalSnapshotInfo {
412
String getId();
413
String getSiddhiAppId();
414
String getType();
415
String getQueryName();
416
String getElementId();
417
long getTime();
418
Map<String, Object> getPartitionKeyGroupMap();
419
}
420
421
public class PersistenceStoreException extends SiddhiException {
422
public PersistenceStoreException(String message);
423
public PersistenceStoreException(String message, Throwable cause);
424
}
425
426
public class CannotRestoreSiddhiAppStateException extends SiddhiException {
427
public CannotRestoreSiddhiAppStateException(String message);
428
public CannotRestoreSiddhiAppStateException(String message, Throwable cause);
429
}
430
431
public class CannotClearSiddhiAppStateException extends SiddhiException {
432
public CannotClearSiddhiAppStateException(String message);
433
public CannotClearSiddhiAppStateException(String message, Throwable cause);
434
}
435
436
public class NoPersistenceStoreException extends SiddhiException {
437
public NoPersistenceStoreException(String message);
438
}
439
```