0
# Aggregations
1
2
Aggregation functionality provides incremental aggregation processing with support for different time durations, distributed processing, and efficient data management. Siddhi's aggregation capabilities enable real-time analytics over streaming data with automatic time-based grouping.
3
4
## AggregationRuntime
5
6
Runtime for managing aggregation operations and incremental processing. Handles the execution and lifecycle of aggregation queries with support for various time granularities.
7
8
```java { .api }
9
public class AggregationRuntime {
10
// Core aggregation management
11
// Manages incremental aggregations across different time durations
12
// Supports distributed aggregation processing
13
// Provides incremental data purging capabilities
14
// Memory calculation for statistics
15
// Handles external time processing
16
}
17
```
18
19
## Aggregation Concepts
20
21
### Time-Based Aggregation
22
23
Siddhi aggregations support automatic time-based grouping at different granularities:
24
25
- **sec** - Second-level aggregations
26
- **min** - Minute-level aggregations
27
- **hour** - Hour-level aggregations
28
- **day** - Daily aggregations
29
- **month** - Monthly aggregations
30
- **year** - Yearly aggregations
31
32
### Incremental Processing
33
34
Aggregations in Siddhi use incremental processing to maintain efficiency:
35
36
- New events update existing aggregation values
37
- Historical data is preserved at different time granularities
38
- Automatic rollup from finer to coarser time units
39
- Support for out-of-order event processing
40
41
## Aggregation Definition Syntax
42
43
```sql
44
-- Basic aggregation definition
45
define aggregation StockAggregation
46
from StockStream
47
select symbol, avg(price) as avgPrice, sum(volume) as totalVolume
48
group by symbol
49
aggregate by timestamp every sec...year;
50
51
-- Advanced aggregation with partitioning
52
@store(type="rdbms", datasource="StockDB")
53
define aggregation PartitionedStockAgg
54
from StockStream#window.time(1 min)
55
select symbol, sector,
56
avg(price) as avgPrice,
57
max(price) as maxPrice,
58
min(price) as minPrice,
59
sum(volume) as totalVolume,
60
count() as eventCount
61
group by symbol, sector
62
aggregate by timestamp every sec...day;
63
```
64
65
## Querying Aggregations
66
67
### Basic Aggregation Queries
68
69
```java
70
// Query aggregation for specific time range
71
String query = "from StockAggregation " +
72
"within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +
73
"per 'day' " +
74
"select symbol, avgPrice, totalVolume";
75
76
Event[] results = siddhiAppRuntime.query(query);
77
78
for (Event event : results) {
79
String symbol = (String) event.getData(0);
80
Double avgPrice = (Double) event.getData(1);
81
Long totalVolume = (Long) event.getData(2);
82
83
System.out.println(String.format("Symbol: %s, Avg Price: %.2f, Volume: %d",
84
symbol, avgPrice, totalVolume));
85
}
86
```
87
88
### Advanced Aggregation Queries
89
90
```java
91
// Query with filtering and ordering
92
String complexQuery = "from StockAggregation " +
93
"on symbol == 'IBM' " +
94
"within '2023-06-01 00:00:00', '2023-06-30 23:59:59' " +
95
"per 'hour' " +
96
"select AGG_TIMESTAMP, avgPrice, maxPrice, minPrice " +
97
"order by AGG_TIMESTAMP desc " +
98
"limit 24";
99
100
Event[] hourlyData = siddhiAppRuntime.query(complexQuery);
101
102
// Query aggregation with grouping
103
String groupedQuery = "from SectorAggregation " +
104
"within '2023-01-01 00:00:00', '2023-01-31 23:59:59' " +
105
"per 'day' " +
106
"select sector, sum(totalVolume) as sectorVolume, avg(avgPrice) as sectorAvgPrice " +
107
"group by sector " +
108
"having sectorVolume > 1000000";
109
110
Event[] sectorData = siddhiAppRuntime.query(groupedQuery);
111
```
112
113
## Aggregation Functions
114
115
### Built-in Aggregation Functions
116
117
Common aggregation functions available in Siddhi:
118
119
```sql
120
-- Numeric aggregations
121
avg(attribute) -- Average value
122
sum(attribute) -- Sum of values
123
max(attribute) -- Maximum value
124
min(attribute) -- Minimum value
125
count() -- Count of events
126
stddev(attribute) -- Standard deviation
127
128
-- String aggregations
129
distinctcount(attribute) -- Count of distinct values
130
131
-- Time-based functions
132
AGG_TIMESTAMP -- Aggregation timestamp
133
AGG_START_TIMESTAMP -- Aggregation window start
134
AGG_END_TIMESTAMP -- Aggregation window end
135
```
136
137
### Custom Aggregation Functions
138
139
```java
140
// Example of using custom aggregation
141
String customAggQuery = "from StockStream " +
142
"select symbol, " +
143
" custom:weightedAvg(price, volume) as weightedAvgPrice, " +
144
" custom:volatility(price) as priceVolatility " +
145
"group by symbol " +
146
"aggregate by timestamp every min...hour";
147
```
148
149
## Distributed Aggregation
150
151
### Multi-Node Aggregation Setup
152
153
```java
154
// Configure distributed aggregation
155
SiddhiManager siddhiManager = new SiddhiManager();
156
157
// Set up cluster configuration for distributed processing
158
ClusterConfig clusterConfig = new ClusterConfig();
159
clusterConfig.setNodeId("node1");
160
clusterConfig.setClusterNodes(Arrays.asList("node1", "node2", "node3"));
161
162
siddhiManager.setClusterConfig(clusterConfig);
163
164
// Define distributed aggregation
165
String distributedAgg =
166
"@store(type='rdbms', datasource='ClusterDB') " +
167
"@PartitionById " +
168
"define aggregation GlobalStockAgg " +
169
"from StockStream " +
170
"select symbol, avg(price) as avgPrice, sum(volume) as totalVolume " +
171
"group by symbol " +
172
"aggregate by timestamp every sec...day;";
173
```
174
175
## Data Purging and Retention
176
177
### Automatic Data Purging
178
179
```java
180
// Enable incremental data purging
181
siddhiAppRuntime.setPurgingEnabled(true);
182
183
// Configure retention policies through aggregation annotations
184
String retentionAgg =
185
"@store(type='rdbms', datasource='StockDB') " +
186
"@purge(enable='true', interval='1 hour', retentionPeriod='30 days') " +
187
"define aggregation RetentionStockAgg " +
188
"from StockStream " +
189
"select symbol, avg(price) as avgPrice " +
190
"group by symbol " +
191
"aggregate by timestamp every sec...day;";
192
```
193
194
### Manual Data Management
195
196
```java
197
// Query old data before purging
198
String oldDataQuery = "from StockAggregation " +
199
"within '2022-01-01 00:00:00', '2022-12-31 23:59:59' " +
200
"per 'month' " +
201
"select symbol, avgPrice";
202
203
Event[] oldData = siddhiAppRuntime.query(oldDataQuery);
204
archiveData(oldData);
205
206
// Trigger manual purging (if supported)
207
// Note: Actual purging depends on store implementation
208
```
209
210
## Performance Optimization
211
212
### Indexing and Partitioning
213
214
```sql
215
-- Optimized aggregation with indexing hints
216
@store(type='rdbms', datasource='StockDB')
217
@index('symbol', 'timestamp')
218
@partitionByKey('symbol')
219
define aggregation OptimizedStockAgg
220
from StockStream
221
select symbol, avg(price) as avgPrice, sum(volume) as totalVolume
222
group by symbol
223
aggregate by timestamp every sec...hour;
224
```
225
226
### Memory Management
227
228
```java
229
// Monitor aggregation memory usage
230
public class AggregationMonitor {
231
public void monitorAggregation(SiddhiAppRuntime runtime) {
232
// Check memory usage of aggregations
233
Collection<Table> tables = runtime.getTables();
234
for (Table table : tables) {
235
if (table instanceof MemoryCalculable) {
236
long memoryUsage = ((MemoryCalculable) table).getSize();
237
System.out.println("Table memory usage: " + memoryUsage + " bytes");
238
}
239
}
240
}
241
}
242
```
243
244
## Real-time Analytics Examples
245
246
### Financial Market Analytics
247
248
```java
249
// Real-time stock market aggregation
250
String marketAgg =
251
"define aggregation MarketAggregation " +
252
"from StockStream " +
253
"select symbol, sector, " +
254
" avg(price) as avgPrice, " +
255
" max(price) as dayHigh, " +
256
" min(price) as dayLow, " +
257
" sum(volume) as totalVolume, " +
258
" count() as tradeCount, " +
259
" stddev(price) as volatility " +
260
"group by symbol, sector " +
261
"aggregate by timestamp every sec...day;";
262
263
// Query for market summary
264
String marketSummary = "from MarketAggregation " +
265
"within '2023-06-01 00:00:00', '2023-06-01 23:59:59' " +
266
"per 'day' " +
267
"select sector, " +
268
" avg(avgPrice) as sectorAvgPrice, " +
269
" sum(totalVolume) as sectorVolume, " +
270
" avg(volatility) as sectorVolatility " +
271
"group by sector";
272
```
273
274
### IoT Sensor Analytics
275
276
```java
277
// IoT device aggregation
278
String iotAgg =
279
"define aggregation SensorAggregation " +
280
"from SensorStream " +
281
"select deviceId, location, " +
282
" avg(temperature) as avgTemp, " +
283
" max(temperature) as maxTemp, " +
284
" min(temperature) as minTemp, " +
285
" avg(humidity) as avgHumidity, " +
286
" count() as readingCount " +
287
"group by deviceId, location " +
288
"aggregate by timestamp every min...day;";
289
290
// Real-time monitoring query
291
String deviceStatus = "from SensorAggregation " +
292
"within 'now() - 1 hour', 'now()' " +
293
"per 'min' " +
294
"select deviceId, avgTemp, avgHumidity " +
295
"having avgTemp > 30.0 or avgHumidity > 80.0";
296
```
297
298
## Types
299
300
```java { .api }
301
public interface AggregationDefinition extends AbstractDefinition {
302
// Definition of an aggregation
303
String getId();
304
List<Attribute> getAttributeList();
305
}
306
307
public interface MemoryCalculable {
308
long getSize();
309
}
310
311
public interface Table extends MemoryCalculable {
312
// Interface for table implementations
313
}
314
315
public interface ClusterConfig {
316
void setNodeId(String nodeId);
317
void setClusterNodes(List<String> nodes);
318
String getNodeId();
319
List<String> getClusterNodes();
320
}
321
```