or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcore-management.mdevent-handling.mdexceptions.mdextensions.mdindex.mdpersistence.mdqueries-and-callbacks.mdstatistics.md

persistence.mddocs/

0

# Persistence

1

2

State management and persistence capabilities provide fault tolerance and recovery scenarios for Siddhi applications. This includes interfaces for state persistence, snapshot management, and incremental persistence for high-performance scenarios.

3

4

## Persistence Interfaces

5

6

### PersistenceStore

7

8

Interface for state persistence providing basic save/load operations for Siddhi application state.

9

10

```java { .api }

11

public interface PersistenceStore {

12

// Core Persistence Operations

13

void save(String siddhiAppName, String revision, byte[] snapshot);

14

byte[] load(String siddhiAppName, String revision);

15

16

// Revision Management

17

String getLastRevision(String siddhiAppName);

18

void clearRevision(String siddhiAppName, String revision);

19

void clearAllRevisions(String siddhiAppName);

20

21

// Store Management

22

void setStatePersistenceConfigs(StatePersistenceConfig statePersistenceConfig);

23

}

24

```

25

26

### IncrementalPersistenceStore

27

28

Interface for incremental state persistence, optimized for high-performance scenarios with selective state updates.

29

30

```java { .api }

31

public interface IncrementalPersistenceStore {

32

// Incremental Operations

33

void save(IncrementalSnapshotInfo snapshotInfo, byte[] snapshot);

34

byte[] load(IncrementalSnapshotInfo snapshotInfo);

35

36

// Configuration Management

37

void setProperties(Map properties);

38

39

// Revision Management

40

List<IncrementalSnapshotInfo> getListOfRevisionsToLoad(long restoreTime, String siddhiAppName);

41

String getLastRevision(String siddhiAppId);

42

void clearAllRevisions(String siddhiAppId);

43

}

44

```

45

46

### PersistenceReference

47

48

Reference to persisted state providing metadata about persistence operations.

49

50

```java { .api }

51

public interface PersistenceReference {

52

String getRevision();

53

long getTimestamp();

54

String getSiddhiAppName();

55

}

56

```

57

58

### Snapshotable

59

60

Interface for snapshot capability, enabling objects to provide their state for persistence.

61

62

```java { .api }

63

public interface Snapshotable {

64

byte[] getSnapshot();

65

void restoreSnapshot(byte[] snapshot);

66

}

67

```

68

69

## SiddhiAppRuntime Persistence Operations

70

71

### Basic Persistence Operations

72

73

```java { .api }

74

public class SiddhiAppRuntime {

75

// State Management

76

public PersistenceReference persist();

77

public byte[] snapshot();

78

public void restore(byte[] snapshot);

79

public void restoreRevision(String revision);

80

public void restoreLastRevision();

81

public void clearAllRevisions();

82

}

83

```

84

85

### Usage Examples

86

87

```java

88

// Basic persistence workflow

89

SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

90

runtime.start();

91

92

// Process events for some time...

93

InputHandler handler = runtime.getInputHandler("StockStream");

94

handler.send(new Object[]{"IBM", 150.0, 1000L});

95

96

// Persist current state

97

PersistenceReference ref = runtime.persist();

98

System.out.println("Persisted state with revision: " + ref.getRevision());

99

100

// Continue processing...

101

handler.send(new Object[]{"MSFT", 120.0, 500L});

102

103

// Take a snapshot

104

byte[] snapshot = runtime.snapshot();

105

saveSnapshotToFile(snapshot);

106

107

// Simulate restart - restore from snapshot

108

runtime.shutdown();

109

runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

110

runtime.restore(snapshot);

111

runtime.start();

112

113

// Or restore from specific revision

114

runtime.restoreRevision(ref.getRevision());

115

```

116

117

## SiddhiManager Global Persistence

118

119

### Manager-Level Persistence

120

121

```java { .api }

122

public class SiddhiManager {

123

// Global Persistence Operations

124

public void persist();

125

public void restoreLastState();

126

public String getLastRevision(String siddhiAppName);

127

128

// Persistence Store Configuration

129

public void setPersistenceStore(PersistenceStore persistenceStore);

130

public void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore);

131

}

132

```

133

134

### Usage Examples

135

136

```java

137

// Configure persistence store

138

FilePersistenceStore persistenceStore = new FilePersistenceStore("./siddhi-state");

139

siddhiManager.setPersistenceStore(persistenceStore);

140

141

// Create and configure multiple applications

142

SiddhiAppRuntime app1 = siddhiManager.createSiddhiAppRuntime(tradingApp);

143

SiddhiAppRuntime app2 = siddhiManager.createSiddhiAppRuntime(alertingApp);

144

145

app1.start();

146

app2.start();

147

148

// Persist all applications at once

149

siddhiManager.persist();

150

151

// Simulate system restart

152

siddhiManager.shutdown();

153

siddhiManager = new SiddhiManager();

154

siddhiManager.setPersistenceStore(persistenceStore);

155

156

// Restore all applications

157

siddhiManager.restoreLastState();

158

159

// Get specific revision information

160

String lastRevision = siddhiManager.getLastRevision("TradingApp");

161

System.out.println("Last revision for TradingApp: " + lastRevision);

162

```

163

164

## Persistence Store Implementations

165

166

### File-Based Persistence

167

168

```java

169

// Example file-based persistence store usage

170

public class FilePersistenceStore implements PersistenceStore {

171

private final String baseDirectory;

172

173

public FilePersistenceStore(String baseDirectory) {

174

this.baseDirectory = baseDirectory;

175

// Ensure directory exists

176

new File(baseDirectory).mkdirs();

177

}

178

179

@Override

180

public void save(String siddhiAppName, String revision, byte[] snapshot) {

181

try {

182

String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";

183

Files.write(Paths.get(filename), snapshot);

184

} catch (IOException e) {

185

throw new PersistenceStoreException("Failed to save snapshot", e);

186

}

187

}

188

189

@Override

190

public byte[] load(String siddhiAppName, String revision) {

191

try {

192

String filename = baseDirectory + "/" + siddhiAppName + "_" + revision + ".snapshot";

193

return Files.readAllBytes(Paths.get(filename));

194

} catch (IOException e) {

195

throw new PersistenceStoreException("Failed to load snapshot", e);

196

}

197

}

198

}

199

200

// Usage

201

FilePersistenceStore fileStore = new FilePersistenceStore("./siddhi-persistence");

202

siddhiManager.setPersistenceStore(fileStore);

203

```

204

205

### Database-Based Persistence

206

207

```java

208

// Example database persistence configuration

209

public class DatabasePersistenceStore implements PersistenceStore {

210

private final DataSource dataSource;

211

212

public DatabasePersistenceStore(DataSource dataSource) {

213

this.dataSource = dataSource;

214

initializeTables();

215

}

216

217

@Override

218

public void save(String siddhiAppName, String revision, byte[] snapshot) {

219

String sql = "INSERT INTO siddhi_snapshots (app_name, revision, snapshot_data, created_at) " +

220

"VALUES (?, ?, ?, ?)";

221

222

try (Connection conn = dataSource.getConnection();

223

PreparedStatement stmt = conn.prepareStatement(sql)) {

224

225

stmt.setString(1, siddhiAppName);

226

stmt.setString(2, revision);

227

stmt.setBytes(3, snapshot);

228

stmt.setTimestamp(4, new Timestamp(System.currentTimeMillis()));

229

230

stmt.executeUpdate();

231

} catch (SQLException e) {

232

throw new PersistenceStoreException("Failed to save to database", e);

233

}

234

}

235

}

236

```

237

238

## Incremental Persistence

239

240

### High-Performance Incremental Persistence

241

242

```java

243

// Configure incremental persistence for high-throughput scenarios

244

public class RedisIncrementalStore implements IncrementalPersistenceStore {

245

private final RedisTemplate<String, byte[]> redisTemplate;

246

247

@Override

248

public void save(StatePersistenceConfig config, String siddhiAppName, byte[] configSnapshot) {

249

String key = "siddhi:incremental:" + siddhiAppName;

250

redisTemplate.opsForValue().set(key, configSnapshot);

251

252

// Set TTL based on configuration

253

redisTemplate.expire(key, config.getRetentionDuration(), TimeUnit.SECONDS);

254

}

255

256

@Override

257

public byte[] load(StatePersistenceConfig config, String siddhiAppName) {

258

String key = "siddhi:incremental:" + siddhiAppName;

259

return redisTemplate.opsForValue().get(key);

260

}

261

}

262

263

// Usage with incremental persistence

264

RedisIncrementalStore incrementalStore = new RedisIncrementalStore(redisTemplate);

265

siddhiManager.setIncrementalPersistenceStore(incrementalStore);

266

```

267

268

## Advanced Persistence Patterns

269

270

### Scheduled Persistence

271

272

```java

273

// Automated periodic persistence

274

public class ScheduledPersistenceManager {

275

private final SiddhiManager siddhiManager;

276

private final ScheduledExecutorService scheduler;

277

278

public ScheduledPersistenceManager(SiddhiManager siddhiManager) {

279

this.siddhiManager = siddhiManager;

280

this.scheduler = Executors.newScheduledThreadPool(1);

281

}

282

283

public void startPeriodicPersistence(long intervalMinutes) {

284

scheduler.scheduleAtFixedRate(() -> {

285

try {

286

System.out.println("Starting scheduled persistence...");

287

siddhiManager.persist();

288

System.out.println("Scheduled persistence completed");

289

} catch (Exception e) {

290

System.err.println("Scheduled persistence failed: " + e.getMessage());

291

}

292

}, intervalMinutes, intervalMinutes, TimeUnit.MINUTES);

293

}

294

295

public void shutdown() {

296

scheduler.shutdown();

297

}

298

}

299

300

// Usage

301

ScheduledPersistenceManager persistenceManager =

302

new ScheduledPersistenceManager(siddhiManager);

303

persistenceManager.startPeriodicPersistence(15); // Every 15 minutes

304

```

305

306

### Conditional Persistence

307

308

```java

309

// Event-driven persistence based on conditions

310

public class ConditionalPersistenceHandler extends StreamCallback {

311

private final SiddhiAppRuntime runtime;

312

private final AtomicLong eventCount = new AtomicLong(0);

313

private final long persistenceThreshold = 10000;

314

315

@Override

316

public void receive(Event[] events) {

317

long count = eventCount.addAndGet(events.length);

318

319

// Persist state after processing threshold number of events

320

if (count >= persistenceThreshold) {

321

try {

322

PersistenceReference ref = runtime.persist();

323

System.out.println("Auto-persisted after " + count + " events, revision: " +

324

ref.getRevision());

325

eventCount.set(0); // Reset counter

326

} catch (Exception e) {

327

System.err.println("Auto-persistence failed: " + e.getMessage());

328

}

329

}

330

}

331

}

332

```

333

334

### Clustered Persistence

335

336

```java

337

// Distributed persistence for cluster environments

338

public class ClusteredPersistenceCoordinator {

339

private final SiddhiManager siddhiManager;

340

private final ClusterCoordinator coordinator;

341

342

public void coordinatedPersistence() {

343

// Only leader node initiates persistence

344

if (coordinator.isLeader()) {

345

// Coordinate persistence across cluster

346

coordinator.broadcast("PREPARE_PERSISTENCE");

347

348

// Wait for all nodes to be ready

349

coordinator.waitForAllNodesReady();

350

351

// Execute persistence

352

siddhiManager.persist();

353

354

// Notify completion

355

coordinator.broadcast("PERSISTENCE_COMPLETE");

356

}

357

}

358

}

359

```

360

361

## Error Handling and Recovery

362

363

### Persistence Exception Handling

364

365

```java

366

public class RobustPersistenceManager {

367

private final SiddhiAppRuntime runtime;

368

private final List<PersistenceStore> backupStores;

369

370

public void safePersist() {

371

Exception lastException = null;

372

373

// Try primary store first

374

try {

375

runtime.persist();

376

return;

377

} catch (PersistenceStoreException e) {

378

lastException = e;

379

System.err.println("Primary persistence failed: " + e.getMessage());

380

}

381

382

// Try backup stores

383

for (PersistenceStore backupStore : backupStores) {

384

try {

385

// Switch to backup store and retry

386

runtime.setPersistenceStore(backupStore);

387

runtime.persist();

388

System.out.println("Successfully persisted to backup store");

389

return;

390

} catch (Exception e) {

391

lastException = e;

392

System.err.println("Backup persistence failed: " + e.getMessage());

393

}

394

}

395

396

// All stores failed

397

throw new PersistenceStoreException("All persistence stores failed", lastException);

398

}

399

}

400

```

401

402

## Types

403

404

```java { .api }

405

public interface StatePersistenceConfig {

406

long getRetentionDuration();

407

String getStorageLocation();

408

Map<String, String> getProperties();

409

}

410

411

public interface IncrementalSnapshotInfo {

412

String getId();

413

String getSiddhiAppId();

414

String getType();

415

String getQueryName();

416

String getElementId();

417

long getTime();

418

Map<String, Object> getPartitionKeyGroupMap();

419

}

420

421

public class PersistenceStoreException extends SiddhiException {

422

public PersistenceStoreException(String message);

423

public PersistenceStoreException(String message, Throwable cause);

424

}

425

426

public class CannotRestoreSiddhiAppStateException extends SiddhiException {

427

public CannotRestoreSiddhiAppStateException(String message);

428

public CannotRestoreSiddhiAppStateException(String message, Throwable cause);

429

}

430

431

public class CannotClearSiddhiAppStateException extends SiddhiException {

432

public CannotClearSiddhiAppStateException(String message);

433

public CannotClearSiddhiAppStateException(String message, Throwable cause);

434

}

435

436

public class NoPersistenceStoreException extends SiddhiException {

437

public NoPersistenceStoreException(String message);

438

}

439

```