CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-wso2-siddhi--siddhi-core

Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.

Overview
Eval results
Files

aggregations.mddocs/

Aggregations

Aggregation functionality provides incremental aggregation processing with support for different time durations, distributed processing, and efficient data management. Siddhi's aggregation capabilities enable real-time analytics over streaming data with automatic time-based grouping.

AggregationRuntime

Runtime for managing aggregation operations and incremental processing. Handles the execution and lifecycle of aggregation queries with support for various time granularities.

public class AggregationRuntime {
    // Core aggregation management
    // Manages incremental aggregations across different time durations
    // Supports distributed aggregation processing  
    // Provides incremental data purging capabilities
    // Memory calculation for statistics
    // Handles external time processing
}

Aggregation Concepts

Time-Based Aggregation

Siddhi aggregations support automatic time-based grouping at different granularities:

  • sec - Second-level aggregations
  • min - Minute-level aggregations
  • hour - Hour-level aggregations
  • day - Daily aggregations
  • month - Monthly aggregations
  • year - Yearly aggregations

Incremental Processing

Aggregations in Siddhi use incremental processing to maintain efficiency:

  • New events update existing aggregation values
  • Historical data is preserved at different time granularities
  • Automatic rollup from finer to coarser time units
  • Support for out-of-order event processing

Aggregation Definition Syntax

-- Basic aggregation definition
define aggregation StockAggregation
from StockStream
select symbol, avg(price) as avgPrice, sum(volume) as totalVolume
    group by symbol
    aggregate by timestamp every sec...year;

-- Advanced aggregation with partitioning
@store(type="rdbms", datasource="StockDB")
define aggregation PartitionedStockAgg
from StockStream#window.time(1 min)
select symbol, sector, 
       avg(price) as avgPrice, 
       max(price) as maxPrice,
       min(price) as minPrice,
       sum(volume) as totalVolume,
       count() as eventCount
    group by symbol, sector
    aggregate by timestamp every sec...day;

Querying Aggregations

Basic Aggregation Queries

// Query aggregation for specific time range
String query = "from StockAggregation " +
               "within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +
               "per 'day' " +
               "select symbol, avgPrice, totalVolume";

Event[] results = siddhiAppRuntime.query(query);

for (Event event : results) {
    String symbol = (String) event.getData(0);
    Double avgPrice = (Double) event.getData(1);
    Long totalVolume = (Long) event.getData(2);
    
    System.out.println(String.format("Symbol: %s, Avg Price: %.2f, Volume: %d", 
                       symbol, avgPrice, totalVolume));
}

Advanced Aggregation Queries

// Query with filtering and ordering
String complexQuery = "from StockAggregation " +
                     "on symbol == 'IBM' " +
                     "within '2023-06-01 00:00:00', '2023-06-30 23:59:59' " +
                     "per 'hour' " +
                     "select AGG_TIMESTAMP, avgPrice, maxPrice, minPrice " +
                     "order by AGG_TIMESTAMP desc " +
                     "limit 24";

Event[] hourlyData = siddhiAppRuntime.query(complexQuery);

// Query aggregation with grouping
String groupedQuery = "from SectorAggregation " +
                     "within '2023-01-01 00:00:00', '2023-01-31 23:59:59' " +
                     "per 'day' " +
                     "select sector, sum(totalVolume) as sectorVolume, avg(avgPrice) as sectorAvgPrice " +
                     "group by sector " +
                     "having sectorVolume > 1000000";

Event[] sectorData = siddhiAppRuntime.query(groupedQuery);

Aggregation Functions

Built-in Aggregation Functions

Common aggregation functions available in Siddhi:

-- Numeric aggregations
avg(attribute)          -- Average value
sum(attribute)          -- Sum of values
max(attribute)          -- Maximum value
min(attribute)          -- Minimum value
count()                 -- Count of events
stddev(attribute)       -- Standard deviation

-- String aggregations  
distinctcount(attribute) -- Count of distinct values

-- Time-based functions
AGG_TIMESTAMP           -- Aggregation timestamp
AGG_START_TIMESTAMP     -- Aggregation window start
AGG_END_TIMESTAMP       -- Aggregation window end

Custom Aggregation Functions

// Example of using custom aggregation
String customAggQuery = "from StockStream " +
                       "select symbol, " +
                       "       custom:weightedAvg(price, volume) as weightedAvgPrice, " +
                       "       custom:volatility(price) as priceVolatility " +
                       "group by symbol " +
                       "aggregate by timestamp every min...hour";

Distributed Aggregation

Multi-Node Aggregation Setup

// Configure distributed aggregation
SiddhiManager siddhiManager = new SiddhiManager();

// Set up cluster configuration for distributed processing
ClusterConfig clusterConfig = new ClusterConfig();
clusterConfig.setNodeId("node1");
clusterConfig.setClusterNodes(Arrays.asList("node1", "node2", "node3"));

siddhiManager.setClusterConfig(clusterConfig);

// Define distributed aggregation
String distributedAgg = 
    "@store(type='rdbms', datasource='ClusterDB') " +
    "@PartitionById " +
    "define aggregation GlobalStockAgg " +
    "from StockStream " +
    "select symbol, avg(price) as avgPrice, sum(volume) as totalVolume " +
    "group by symbol " +
    "aggregate by timestamp every sec...day;";

Data Purging and Retention

Automatic Data Purging

// Enable incremental data purging
siddhiAppRuntime.setPurgingEnabled(true);

// Configure retention policies through aggregation annotations
String retentionAgg = 
    "@store(type='rdbms', datasource='StockDB') " +
    "@purge(enable='true', interval='1 hour', retentionPeriod='30 days') " +
    "define aggregation RetentionStockAgg " +
    "from StockStream " +
    "select symbol, avg(price) as avgPrice " +
    "group by symbol " +
    "aggregate by timestamp every sec...day;";

Manual Data Management

// Query old data before purging
String oldDataQuery = "from StockAggregation " +
                     "within '2022-01-01 00:00:00', '2022-12-31 23:59:59' " +
                     "per 'month' " +
                     "select symbol, avgPrice";

Event[] oldData = siddhiAppRuntime.query(oldDataQuery);
archiveData(oldData);

// Trigger manual purging (if supported)
// Note: Actual purging depends on store implementation

Performance Optimization

Indexing and Partitioning

-- Optimized aggregation with indexing hints
@store(type='rdbms', datasource='StockDB')
@index('symbol', 'timestamp')
@partitionByKey('symbol')
define aggregation OptimizedStockAgg
from StockStream
select symbol, avg(price) as avgPrice, sum(volume) as totalVolume
    group by symbol
    aggregate by timestamp every sec...hour;

Memory Management

// Monitor aggregation memory usage
public class AggregationMonitor {
    public void monitorAggregation(SiddhiAppRuntime runtime) {
        // Check memory usage of aggregations
        Collection<Table> tables = runtime.getTables();
        for (Table table : tables) {
            if (table instanceof MemoryCalculable) {
                long memoryUsage = ((MemoryCalculable) table).getSize();
                System.out.println("Table memory usage: " + memoryUsage + " bytes");
            }
        }
    }
}

Real-time Analytics Examples

Financial Market Analytics

// Real-time stock market aggregation
String marketAgg = 
    "define aggregation MarketAggregation " +
    "from StockStream " +
    "select symbol, sector, " +
    "       avg(price) as avgPrice, " +
    "       max(price) as dayHigh, " +
    "       min(price) as dayLow, " +
    "       sum(volume) as totalVolume, " +
    "       count() as tradeCount, " +
    "       stddev(price) as volatility " +
    "group by symbol, sector " +
    "aggregate by timestamp every sec...day;";

// Query for market summary
String marketSummary = "from MarketAggregation " +
                      "within '2023-06-01 00:00:00', '2023-06-01 23:59:59' " +
                      "per 'day' " +
                      "select sector, " +
                      "       avg(avgPrice) as sectorAvgPrice, " +
                      "       sum(totalVolume) as sectorVolume, " +
                      "       avg(volatility) as sectorVolatility " +
                      "group by sector";

IoT Sensor Analytics

// IoT device aggregation
String iotAgg = 
    "define aggregation SensorAggregation " +
    "from SensorStream " +
    "select deviceId, location, " +
    "       avg(temperature) as avgTemp, " +
    "       max(temperature) as maxTemp, " +
    "       min(temperature) as minTemp, " +
    "       avg(humidity) as avgHumidity, " +
    "       count() as readingCount " +
    "group by deviceId, location " +
    "aggregate by timestamp every min...day;";

// Real-time monitoring query
String deviceStatus = "from SensorAggregation " +
                     "within 'now() - 1 hour', 'now()' " +
                     "per 'min' " +
                     "select deviceId, avgTemp, avgHumidity " +
                     "having avgTemp > 30.0 or avgHumidity > 80.0";

Types

public interface AggregationDefinition extends AbstractDefinition {
    // Definition of an aggregation
    String getId();
    List<Attribute> getAttributeList();
}

public interface MemoryCalculable {
    long getSize();
}

public interface Table extends MemoryCalculable {
    // Interface for table implementations
}

public interface ClusterConfig {
    void setNodeId(String nodeId);
    void setClusterNodes(List<String> nodes);
    String getNodeId();
    List<String> getClusterNodes();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core

docs

aggregations.md

core-management.md

event-handling.md

exceptions.md

extensions.md

index.md

persistence.md

queries-and-callbacks.md

statistics.md

tile.json