Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
Aggregation functionality provides incremental aggregation processing with support for different time durations, distributed processing, and efficient data management. Siddhi's aggregation capabilities enable real-time analytics over streaming data with automatic time-based grouping.
Runtime for managing aggregation operations and incremental processing. Handles the execution and lifecycle of aggregation queries with support for various time granularities.
public class AggregationRuntime {
// Core aggregation management
// Manages incremental aggregations across different time durations
// Supports distributed aggregation processing
// Provides incremental data purging capabilities
// Memory calculation for statistics
// Handles external time processing
}Siddhi aggregations support automatic time-based grouping at different granularities:
Aggregations in Siddhi use incremental processing to maintain efficiency:
-- Basic aggregation definition
define aggregation StockAggregation
from StockStream
select symbol, avg(price) as avgPrice, sum(volume) as totalVolume
group by symbol
aggregate by timestamp every sec...year;
-- Advanced aggregation with partitioning
@store(type="rdbms", datasource="StockDB")
define aggregation PartitionedStockAgg
from StockStream#window.time(1 min)
select symbol, sector,
avg(price) as avgPrice,
max(price) as maxPrice,
min(price) as minPrice,
sum(volume) as totalVolume,
count() as eventCount
group by symbol, sector
aggregate by timestamp every sec...day;// Query aggregation for specific time range
String query = "from StockAggregation " +
"within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +
"per 'day' " +
"select symbol, avgPrice, totalVolume";
Event[] results = siddhiAppRuntime.query(query);
for (Event event : results) {
String symbol = (String) event.getData(0);
Double avgPrice = (Double) event.getData(1);
Long totalVolume = (Long) event.getData(2);
System.out.println(String.format("Symbol: %s, Avg Price: %.2f, Volume: %d",
symbol, avgPrice, totalVolume));
}// Query with filtering and ordering
String complexQuery = "from StockAggregation " +
"on symbol == 'IBM' " +
"within '2023-06-01 00:00:00', '2023-06-30 23:59:59' " +
"per 'hour' " +
"select AGG_TIMESTAMP, avgPrice, maxPrice, minPrice " +
"order by AGG_TIMESTAMP desc " +
"limit 24";
Event[] hourlyData = siddhiAppRuntime.query(complexQuery);
// Query aggregation with grouping
String groupedQuery = "from SectorAggregation " +
"within '2023-01-01 00:00:00', '2023-01-31 23:59:59' " +
"per 'day' " +
"select sector, sum(totalVolume) as sectorVolume, avg(avgPrice) as sectorAvgPrice " +
"group by sector " +
"having sectorVolume > 1000000";
Event[] sectorData = siddhiAppRuntime.query(groupedQuery);Common aggregation functions available in Siddhi:
-- Numeric aggregations
avg(attribute) -- Average value
sum(attribute) -- Sum of values
max(attribute) -- Maximum value
min(attribute) -- Minimum value
count() -- Count of events
stddev(attribute) -- Standard deviation
-- String aggregations
distinctcount(attribute) -- Count of distinct values
-- Time-based functions
AGG_TIMESTAMP -- Aggregation timestamp
AGG_START_TIMESTAMP -- Aggregation window start
AGG_END_TIMESTAMP -- Aggregation window end// Example of using custom aggregation
String customAggQuery = "from StockStream " +
"select symbol, " +
" custom:weightedAvg(price, volume) as weightedAvgPrice, " +
" custom:volatility(price) as priceVolatility " +
"group by symbol " +
"aggregate by timestamp every min...hour";// Configure distributed aggregation
SiddhiManager siddhiManager = new SiddhiManager();
// Set up cluster configuration for distributed processing
ClusterConfig clusterConfig = new ClusterConfig();
clusterConfig.setNodeId("node1");
clusterConfig.setClusterNodes(Arrays.asList("node1", "node2", "node3"));
siddhiManager.setClusterConfig(clusterConfig);
// Define distributed aggregation
String distributedAgg =
"@store(type='rdbms', datasource='ClusterDB') " +
"@PartitionById " +
"define aggregation GlobalStockAgg " +
"from StockStream " +
"select symbol, avg(price) as avgPrice, sum(volume) as totalVolume " +
"group by symbol " +
"aggregate by timestamp every sec...day;";// Enable incremental data purging
siddhiAppRuntime.setPurgingEnabled(true);
// Configure retention policies through aggregation annotations
String retentionAgg =
"@store(type='rdbms', datasource='StockDB') " +
"@purge(enable='true', interval='1 hour', retentionPeriod='30 days') " +
"define aggregation RetentionStockAgg " +
"from StockStream " +
"select symbol, avg(price) as avgPrice " +
"group by symbol " +
"aggregate by timestamp every sec...day;";// Query old data before purging
String oldDataQuery = "from StockAggregation " +
"within '2022-01-01 00:00:00', '2022-12-31 23:59:59' " +
"per 'month' " +
"select symbol, avgPrice";
Event[] oldData = siddhiAppRuntime.query(oldDataQuery);
archiveData(oldData);
// Trigger manual purging (if supported)
// Note: Actual purging depends on store implementation-- Optimized aggregation with indexing hints
@store(type='rdbms', datasource='StockDB')
@index('symbol', 'timestamp')
@partitionByKey('symbol')
define aggregation OptimizedStockAgg
from StockStream
select symbol, avg(price) as avgPrice, sum(volume) as totalVolume
group by symbol
aggregate by timestamp every sec...hour;// Monitor aggregation memory usage
public class AggregationMonitor {
public void monitorAggregation(SiddhiAppRuntime runtime) {
// Check memory usage of aggregations
Collection<Table> tables = runtime.getTables();
for (Table table : tables) {
if (table instanceof MemoryCalculable) {
long memoryUsage = ((MemoryCalculable) table).getSize();
System.out.println("Table memory usage: " + memoryUsage + " bytes");
}
}
}
}// Real-time stock market aggregation
String marketAgg =
"define aggregation MarketAggregation " +
"from StockStream " +
"select symbol, sector, " +
" avg(price) as avgPrice, " +
" max(price) as dayHigh, " +
" min(price) as dayLow, " +
" sum(volume) as totalVolume, " +
" count() as tradeCount, " +
" stddev(price) as volatility " +
"group by symbol, sector " +
"aggregate by timestamp every sec...day;";
// Query for market summary
String marketSummary = "from MarketAggregation " +
"within '2023-06-01 00:00:00', '2023-06-01 23:59:59' " +
"per 'day' " +
"select sector, " +
" avg(avgPrice) as sectorAvgPrice, " +
" sum(totalVolume) as sectorVolume, " +
" avg(volatility) as sectorVolatility " +
"group by sector";// IoT device aggregation
String iotAgg =
"define aggregation SensorAggregation " +
"from SensorStream " +
"select deviceId, location, " +
" avg(temperature) as avgTemp, " +
" max(temperature) as maxTemp, " +
" min(temperature) as minTemp, " +
" avg(humidity) as avgHumidity, " +
" count() as readingCount " +
"group by deviceId, location " +
"aggregate by timestamp every min...day;";
// Real-time monitoring query
String deviceStatus = "from SensorAggregation " +
"within 'now() - 1 hour', 'now()' " +
"per 'min' " +
"select deviceId, avgTemp, avgHumidity " +
"having avgTemp > 30.0 or avgHumidity > 80.0";public interface AggregationDefinition extends AbstractDefinition {
// Definition of an aggregation
String getId();
List<Attribute> getAttributeList();
}
public interface MemoryCalculable {
long getSize();
}
public interface Table extends MemoryCalculable {
// Interface for table implementations
}
public interface ClusterConfig {
void setNodeId(String nodeId);
void setClusterNodes(List<String> nodes);
String getNodeId();
List<String> getClusterNodes();
}Install with Tessl CLI
npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core