CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-wso2-siddhi--siddhi-core

Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.

Overview
Eval results
Files

statistics.mddocs/

Statistics

Monitoring and statistics interfaces provide comprehensive tracking of performance, throughput, and resource usage in Siddhi applications. The statistics module enables integration with monitoring systems and provides runtime insights into application behavior.

Statistics Levels

Level

Enumeration for statistics levels controlling the depth of monitoring and performance tracking.

public enum Level {
    OFF,     // No statistics collection
    BASIC,   // Basic performance metrics
    DETAIL   // Detailed statistics with comprehensive tracking
}

Runtime Statistics Management

SiddhiAppRuntime Statistics

public class SiddhiAppRuntime {
    // Statistics Configuration
    public Level getRootMetricsLevel();
    public void enableStats(Level level);
}

Usage Examples

// Configure statistics at runtime
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

// Check current statistics level
Level currentLevel = runtime.getRootMetricsLevel();
System.out.println("Current stats level: " + currentLevel);

// Enable basic statistics
runtime.enableStats(Level.BASIC);

// Start runtime with statistics enabled
runtime.start();

// Process events...
InputHandler handler = runtime.getInputHandler("StockStream");
handler.send(new Object[]{"IBM", 150.0, 1000L});

// Enable detailed statistics for troubleshooting
runtime.enableStats(Level.DETAIL);

// Disable statistics for production performance
runtime.enableStats(Level.OFF);

Statistics Configuration

StatisticsConfiguration

Configuration holder for Siddhi statistics module providing integration with monitoring systems.

public class StatisticsConfiguration {
    // Constructor
    public StatisticsConfiguration(StatisticsTrackerFactory factory);
    
    // Configuration Access
    public StatisticsTrackerFactory getFactory();
    public String getMetricPrefix();
    public void setMetricPrefix(String metricPrefix);
}

SiddhiManager Statistics Setup

public class SiddhiManager {
    // Statistics Configuration
    public void setStatisticsConfiguration(StatisticsConfiguration statisticsConfiguration);
}

Usage Examples

// Create custom statistics factory
StatisticsTrackerFactory customFactory = new CustomStatisticsTrackerFactory();

// Configure statistics
StatisticsConfiguration statsConfig = new StatisticsConfiguration(customFactory);
statsConfig.setMetricPrefix("siddhi.trading.app");

// Apply to SiddhiManager
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setStatisticsConfiguration(statsConfig);

// All applications created will use this configuration
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(tradingApp);
runtime.enableStats(Level.BASIC);

Statistics Interfaces

StatisticsManager

Interface for statistics management providing centralized control over metrics collection.

public interface StatisticsManager {
    // Statistics lifecycle
    void startReporting();
    void stopReporting();
    
    // Metric registration
    void registerStatisticsReporter(StatisticsReporter reporter);
    void unregisterStatisticsReporter(StatisticsReporter reporter);
    
    // Configuration
    void setStatisticsConfiguration(StatisticsConfiguration configuration);
}

StatisticsTrackerFactory

Factory for creating statistics trackers for different types of metrics.

public interface StatisticsTrackerFactory {
    // Tracker creation
    LatencyTracker createLatencyTracker(String name, StatisticsManager statisticsManager);
    ThroughputTracker createThroughputTracker(String name, StatisticsManager statisticsManager);
    MemoryUsageTracker createMemoryUsageTracker(String name, StatisticsManager statisticsManager);
    BufferedEventsTracker createBufferedEventsTracker(String name, StatisticsManager statisticsManager);
}

Performance Tracking

LatencyTracker

Interface for latency tracking measuring processing time and response times.

public interface LatencyTracker {
    // Latency measurement
    void markIn();
    void markOut();
    
    // Batch latency tracking
    void markIn(long count);
    void markOut(long count);
    
    // Statistics retrieval
    double getAverageLatency();
    double getMaxLatency();
    double getMinLatency();
}

ThroughputTracker

Interface for throughput tracking measuring events processed per unit time.

public interface ThroughputTracker {
    // Event counting
    void eventIn();
    void eventIn(long count);
    
    // Throughput measurement
    double getThroughput();
    long getTotalEvents();
    
    // Time-based statistics
    double getThroughputForLastNSeconds(int seconds);
}

Usage Examples

// Custom statistics tracking implementation
public class CustomStatisticsTrackerFactory implements StatisticsTrackerFactory {
    
    @Override
    public LatencyTracker createLatencyTracker(String name, StatisticsManager manager) {
        return new CustomLatencyTracker(name);
    }
    
    @Override
    public ThroughputTracker createThroughputTracker(String name, StatisticsManager manager) {
        return new CustomThroughputTracker(name);
    }
    
    // Custom latency tracker with Micrometer integration
    private class CustomLatencyTracker implements LatencyTracker {
        private final Timer timer;
        private Timer.Sample sample;
        
        public CustomLatencyTracker(String name) {
            this.timer = Timer.builder(name + ".latency")
                    .description("Processing latency")
                    .register(meterRegistry);
        }
        
        @Override
        public void markIn() {
            this.sample = Timer.start(meterRegistry);
        }
        
        @Override
        public void markOut() {
            if (sample != null) {
                sample.stop(timer);
            }
        }
        
        @Override
        public double getAverageLatency() {
            return timer.mean(TimeUnit.MILLISECONDS);
        }
    }
}

Resource Monitoring

MemoryUsageTracker

Interface for memory usage tracking monitoring resource consumption.

public interface MemoryUsageTracker {
    // Memory measurement
    void registerObject(Object object, long size);
    void unregisterObject(Object object);
    
    // Usage statistics
    long getCurrentMemoryUsage();
    long getMaxMemoryUsage();
    
    // Reporting
    void reportMemoryUsage();
}

BufferedEventsTracker

Interface for buffered events tracking monitoring queue sizes and processing backlogs.

public interface BufferedEventsTracker {
    // Buffer monitoring
    void eventBuffered();
    void eventRemoved();
    void eventBuffered(long count);
    void eventRemoved(long count);
    
    // Buffer statistics
    long getCurrentBufferSize();
    long getMaxBufferSize();
    double getAverageBufferSize();
}

MemoryCalculable

Interface for memory calculation capability enabling objects to report their memory usage.

public interface MemoryCalculable {
    long getSize();
}

Advanced Statistics Examples

Comprehensive Monitoring Setup

// Complete monitoring setup with multiple trackers
public class ComprehensiveMonitoring {
    private final SiddhiAppRuntime runtime;
    private final LatencyTracker processingLatency;
    private final ThroughputTracker inputThroughput;
    private final MemoryUsageTracker memoryTracker;
    private final BufferedEventsTracker bufferTracker;
    
    public ComprehensiveMonitoring(SiddhiAppRuntime runtime, StatisticsTrackerFactory factory) {
        this.runtime = runtime;
        
        // Create trackers for different metrics
        StatisticsManager statsManager = getStatisticsManager(runtime);
        this.processingLatency = factory.createLatencyTracker("processing", statsManager);
        this.inputThroughput = factory.createThroughputTracker("input", statsManager);
        this.memoryTracker = factory.createMemoryUsageTracker("memory", statsManager);
        this.bufferTracker = factory.createBufferedEventsTracker("buffer", statsManager);
    }
    
    public void startMonitoring() {
        // Enable detailed statistics
        runtime.enableStats(Level.DETAIL);
        
        // Start periodic reporting
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(this::reportMetrics, 10, 10, TimeUnit.SECONDS);
    }
    
    private void reportMetrics() {
        System.out.println("=== Siddhi App Metrics ===");
        System.out.println("Processing Latency: " + processingLatency.getAverageLatency() + " ms");
        System.out.println("Input Throughput: " + inputThroughput.getThroughput() + " events/sec");
        System.out.println("Memory Usage: " + formatBytes(memoryTracker.getCurrentMemoryUsage()));
        System.out.println("Buffer Size: " + bufferTracker.getCurrentBufferSize() + " events");
        System.out.println("========================");
    }
    
    private String formatBytes(long bytes) {
        return String.format("%.2f MB", bytes / (1024.0 * 1024.0));
    }
}

Performance Alerting

// Performance monitoring with alerting
public class PerformanceMonitor extends StreamCallback {
    private final ThroughputTracker throughputTracker;
    private final LatencyTracker latencyTracker;
    private final double throughputThreshold = 1000.0; // events/sec
    private final double latencyThreshold = 100.0;     // milliseconds
    
    @Override
    public void receive(Event[] events) {
        // Track throughput
        throughputTracker.eventIn(events.length);
        
        // Measure processing latency
        latencyTracker.markIn();
        processEvents(events);
        latencyTracker.markOut();
        
        // Check performance thresholds
        checkPerformanceThresholds();
    }
    
    private void checkPerformanceThresholds() {
        double currentThroughput = throughputTracker.getThroughput();
        double currentLatency = latencyTracker.getAverageLatency();
        
        if (currentThroughput < throughputThreshold) {
            alertLowThroughput(currentThroughput);
        }
        
        if (currentLatency > latencyThreshold) {
            alertHighLatency(currentLatency);
        }
    }
    
    private void alertLowThroughput(double throughput) {
        System.err.println("ALERT: Low throughput detected: " + throughput + " events/sec");
        // Send alert to monitoring system
    }
    
    private void alertHighLatency(double latency) {
        System.err.println("ALERT: High latency detected: " + latency + " ms");
        // Send alert to monitoring system
    }
}

Memory Monitoring

// Memory usage monitoring for Siddhi components
public class MemoryMonitor {
    private final SiddhiAppRuntime runtime;
    
    public void monitorMemoryUsage() {
        // Monitor table memory usage
        Collection<Table> tables = runtime.getTables();
        long totalTableMemory = 0;
        
        for (Table table : tables) {
            if (table instanceof MemoryCalculable) {
                long tableMemory = ((MemoryCalculable) table).getSize();
                totalTableMemory += tableMemory;
                System.out.println("Table memory: " + formatBytes(tableMemory));
            }
        }
        
        System.out.println("Total table memory: " + formatBytes(totalTableMemory));
        
        // Check memory usage against limits
        if (totalTableMemory > getMemoryLimit()) {
            triggerMemoryAlert(totalTableMemory);
        }
    }
    
    private void triggerMemoryAlert(long memoryUsage) {
        System.err.println("MEMORY ALERT: Usage exceeds limit: " + formatBytes(memoryUsage));
        
        // Trigger data purging or scale-out
        triggerDataPurging();
    }
    
    private void triggerDataPurging() {
        // Enable purging to free memory
        runtime.setPurgingEnabled(true);
    }
}

Integration Examples

Metrics Registry Integration

// Integration with Micrometer metrics registry
public class MicrometerStatisticsFactory implements StatisticsTrackerFactory {
    private final MeterRegistry meterRegistry;
    
    public MicrometerStatisticsFactory(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    @Override
    public ThroughputTracker createThroughputTracker(String name, StatisticsManager manager) {
        return new MicrometerThroughputTracker(name, meterRegistry);
    }
    
    private static class MicrometerThroughputTracker implements ThroughputTracker {
        private final Counter counter;
        private final Gauge throughputGauge;
        
        public MicrometerThroughputTracker(String name, MeterRegistry registry) {
            this.counter = Counter.builder(name + ".events")
                    .description("Total events processed")
                    .register(registry);
            
            this.throughputGauge = Gauge.builder(name + ".throughput")
                    .description("Events per second")
                    .register(registry, this, MicrometerThroughputTracker::calculateThroughput);
        }
        
        @Override
        public void eventIn() {
            counter.increment();
        }
        
        @Override
        public double getThroughput() {
            return calculateThroughput();
        }
        
        private double calculateThroughput() {
            // Calculate throughput based on counter and time
            return counter.count() / getUptimeSeconds();
        }
    }
}

Types

public interface StatisticsReporter {
    void report(Map<String, Object> metrics);
    void start();
    void stop();
}

public interface StatisticsManager {
    void registerTracker(String name, Object tracker);
    void unregisterTracker(String name);
    Map<String, Object> getAllMetrics();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core

docs

aggregations.md

core-management.md

event-handling.md

exceptions.md

extensions.md

index.md

persistence.md

queries-and-callbacks.md

statistics.md

tile.json