or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcore-management.mdevent-handling.mdexceptions.mdextensions.mdindex.mdpersistence.mdqueries-and-callbacks.mdstatistics.md

aggregations.mddocs/

0

# Aggregations

1

2

Aggregation functionality provides incremental aggregation processing with support for different time durations, distributed processing, and efficient data management. Siddhi's aggregation capabilities enable real-time analytics over streaming data with automatic time-based grouping.

3

4

## AggregationRuntime

5

6

Runtime for managing aggregation operations and incremental processing. Handles the execution and lifecycle of aggregation queries with support for various time granularities.

7

8

```java { .api }

9

public class AggregationRuntime {

10

// Core aggregation management

11

// Manages incremental aggregations across different time durations

12

// Supports distributed aggregation processing

13

// Provides incremental data purging capabilities

14

// Memory calculation for statistics

15

// Handles external time processing

16

}

17

```

18

19

## Aggregation Concepts

20

21

### Time-Based Aggregation

22

23

Siddhi aggregations support automatic time-based grouping at different granularities:

24

25

- **sec** - Second-level aggregations

26

- **min** - Minute-level aggregations

27

- **hour** - Hour-level aggregations

28

- **day** - Daily aggregations

29

- **month** - Monthly aggregations

30

- **year** - Yearly aggregations

31

32

### Incremental Processing

33

34

Aggregations in Siddhi use incremental processing to maintain efficiency:

35

36

- New events update existing aggregation values

37

- Historical data is preserved at different time granularities

38

- Automatic rollup from finer to coarser time units

39

- Support for out-of-order event processing

40

41

## Aggregation Definition Syntax

42

43

```sql

44

-- Basic aggregation definition

45

define aggregation StockAggregation

46

from StockStream

47

select symbol, avg(price) as avgPrice, sum(volume) as totalVolume

48

group by symbol

49

aggregate by timestamp every sec...year;

50

51

-- Advanced aggregation with partitioning

52

@store(type="rdbms", datasource="StockDB")

53

define aggregation PartitionedStockAgg

54

from StockStream#window.time(1 min)

55

select symbol, sector,

56

avg(price) as avgPrice,

57

max(price) as maxPrice,

58

min(price) as minPrice,

59

sum(volume) as totalVolume,

60

count() as eventCount

61

group by symbol, sector

62

aggregate by timestamp every sec...day;

63

```

64

65

## Querying Aggregations

66

67

### Basic Aggregation Queries

68

69

```java

70

// Query aggregation for specific time range

71

String query = "from StockAggregation " +

72

"within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +

73

"per 'day' " +

74

"select symbol, avgPrice, totalVolume";

75

76

Event[] results = siddhiAppRuntime.query(query);

77

78

for (Event event : results) {

79

String symbol = (String) event.getData(0);

80

Double avgPrice = (Double) event.getData(1);

81

Long totalVolume = (Long) event.getData(2);

82

83

System.out.println(String.format("Symbol: %s, Avg Price: %.2f, Volume: %d",

84

symbol, avgPrice, totalVolume));

85

}

86

```

87

88

### Advanced Aggregation Queries

89

90

```java

91

// Query with filtering and ordering

92

String complexQuery = "from StockAggregation " +

93

"on symbol == 'IBM' " +

94

"within '2023-06-01 00:00:00', '2023-06-30 23:59:59' " +

95

"per 'hour' " +

96

"select AGG_TIMESTAMP, avgPrice, maxPrice, minPrice " +

97

"order by AGG_TIMESTAMP desc " +

98

"limit 24";

99

100

Event[] hourlyData = siddhiAppRuntime.query(complexQuery);

101

102

// Query aggregation with grouping

103

String groupedQuery = "from SectorAggregation " +

104

"within '2023-01-01 00:00:00', '2023-01-31 23:59:59' " +

105

"per 'day' " +

106

"select sector, sum(totalVolume) as sectorVolume, avg(avgPrice) as sectorAvgPrice " +

107

"group by sector " +

108

"having sectorVolume > 1000000";

109

110

Event[] sectorData = siddhiAppRuntime.query(groupedQuery);

111

```

112

113

## Aggregation Functions

114

115

### Built-in Aggregation Functions

116

117

Common aggregation functions available in Siddhi:

118

119

```sql

120

-- Numeric aggregations

121

avg(attribute) -- Average value

122

sum(attribute) -- Sum of values

123

max(attribute) -- Maximum value

124

min(attribute) -- Minimum value

125

count() -- Count of events

126

stddev(attribute) -- Standard deviation

127

128

-- String aggregations

129

distinctcount(attribute) -- Count of distinct values

130

131

-- Time-based functions

132

AGG_TIMESTAMP -- Aggregation timestamp

133

AGG_START_TIMESTAMP -- Aggregation window start

134

AGG_END_TIMESTAMP -- Aggregation window end

135

```

136

137

### Custom Aggregation Functions

138

139

```java

140

// Example of using custom aggregation

141

String customAggQuery = "from StockStream " +

142

"select symbol, " +

143

" custom:weightedAvg(price, volume) as weightedAvgPrice, " +

144

" custom:volatility(price) as priceVolatility " +

145

"group by symbol " +

146

"aggregate by timestamp every min...hour";

147

```

148

149

## Distributed Aggregation

150

151

### Multi-Node Aggregation Setup

152

153

```java

154

// Configure distributed aggregation

155

SiddhiManager siddhiManager = new SiddhiManager();

156

157

// Set up cluster configuration for distributed processing

158

ClusterConfig clusterConfig = new ClusterConfig();

159

clusterConfig.setNodeId("node1");

160

clusterConfig.setClusterNodes(Arrays.asList("node1", "node2", "node3"));

161

162

siddhiManager.setClusterConfig(clusterConfig);

163

164

// Define distributed aggregation

165

String distributedAgg =

166

"@store(type='rdbms', datasource='ClusterDB') " +

167

"@PartitionById " +

168

"define aggregation GlobalStockAgg " +

169

"from StockStream " +

170

"select symbol, avg(price) as avgPrice, sum(volume) as totalVolume " +

171

"group by symbol " +

172

"aggregate by timestamp every sec...day;";

173

```

174

175

## Data Purging and Retention

176

177

### Automatic Data Purging

178

179

```java

180

// Enable incremental data purging

181

siddhiAppRuntime.setPurgingEnabled(true);

182

183

// Configure retention policies through aggregation annotations

184

String retentionAgg =

185

"@store(type='rdbms', datasource='StockDB') " +

186

"@purge(enable='true', interval='1 hour', retentionPeriod='30 days') " +

187

"define aggregation RetentionStockAgg " +

188

"from StockStream " +

189

"select symbol, avg(price) as avgPrice " +

190

"group by symbol " +

191

"aggregate by timestamp every sec...day;";

192

```

193

194

### Manual Data Management

195

196

```java

197

// Query old data before purging

198

String oldDataQuery = "from StockAggregation " +

199

"within '2022-01-01 00:00:00', '2022-12-31 23:59:59' " +

200

"per 'month' " +

201

"select symbol, avgPrice";

202

203

Event[] oldData = siddhiAppRuntime.query(oldDataQuery);

204

archiveData(oldData);

205

206

// Trigger manual purging (if supported)

207

// Note: Actual purging depends on store implementation

208

```

209

210

## Performance Optimization

211

212

### Indexing and Partitioning

213

214

```sql

215

-- Optimized aggregation with indexing hints

216

@store(type='rdbms', datasource='StockDB')

217

@index('symbol', 'timestamp')

218

@partitionByKey('symbol')

219

define aggregation OptimizedStockAgg

220

from StockStream

221

select symbol, avg(price) as avgPrice, sum(volume) as totalVolume

222

group by symbol

223

aggregate by timestamp every sec...hour;

224

```

225

226

### Memory Management

227

228

```java

229

// Monitor aggregation memory usage

230

public class AggregationMonitor {

231

public void monitorAggregation(SiddhiAppRuntime runtime) {

232

// Check memory usage of aggregations

233

Collection<Table> tables = runtime.getTables();

234

for (Table table : tables) {

235

if (table instanceof MemoryCalculable) {

236

long memoryUsage = ((MemoryCalculable) table).getSize();

237

System.out.println("Table memory usage: " + memoryUsage + " bytes");

238

}

239

}

240

}

241

}

242

```

243

244

## Real-time Analytics Examples

245

246

### Financial Market Analytics

247

248

```java

249

// Real-time stock market aggregation

250

String marketAgg =

251

"define aggregation MarketAggregation " +

252

"from StockStream " +

253

"select symbol, sector, " +

254

" avg(price) as avgPrice, " +

255

" max(price) as dayHigh, " +

256

" min(price) as dayLow, " +

257

" sum(volume) as totalVolume, " +

258

" count() as tradeCount, " +

259

" stddev(price) as volatility " +

260

"group by symbol, sector " +

261

"aggregate by timestamp every sec...day;";

262

263

// Query for market summary

264

String marketSummary = "from MarketAggregation " +

265

"within '2023-06-01 00:00:00', '2023-06-01 23:59:59' " +

266

"per 'day' " +

267

"select sector, " +

268

" avg(avgPrice) as sectorAvgPrice, " +

269

" sum(totalVolume) as sectorVolume, " +

270

" avg(volatility) as sectorVolatility " +

271

"group by sector";

272

```

273

274

### IoT Sensor Analytics

275

276

```java

277

// IoT device aggregation

278

String iotAgg =

279

"define aggregation SensorAggregation " +

280

"from SensorStream " +

281

"select deviceId, location, " +

282

" avg(temperature) as avgTemp, " +

283

" max(temperature) as maxTemp, " +

284

" min(temperature) as minTemp, " +

285

" avg(humidity) as avgHumidity, " +

286

" count() as readingCount " +

287

"group by deviceId, location " +

288

"aggregate by timestamp every min...day;";

289

290

// Real-time monitoring query

291

String deviceStatus = "from SensorAggregation " +

292

"within 'now() - 1 hour', 'now()' " +

293

"per 'min' " +

294

"select deviceId, avgTemp, avgHumidity " +

295

"having avgTemp > 30.0 or avgHumidity > 80.0";

296

```

297

298

## Types

299

300

```java { .api }

301

public interface AggregationDefinition extends AbstractDefinition {

302

// Definition of an aggregation

303

String getId();

304

List<Attribute> getAttributeList();

305

}

306

307

public interface MemoryCalculable {

308

long getSize();

309

}

310

311

public interface Table extends MemoryCalculable {

312

// Interface for table implementations

313

}

314

315

public interface ClusterConfig {

316

void setNodeId(String nodeId);

317

void setClusterNodes(List<String> nodes);

318

String getNodeId();

319

List<String> getClusterNodes();

320

}

321

```