Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
Monitoring and statistics interfaces provide comprehensive tracking of performance, throughput, and resource usage in Siddhi applications. The statistics module enables integration with monitoring systems and provides runtime insights into application behavior.
Enumeration for statistics levels controlling the depth of monitoring and performance tracking.
public enum Level {
OFF, // No statistics collection
BASIC, // Basic performance metrics
DETAIL // Detailed statistics with comprehensive tracking
}public class SiddhiAppRuntime {
// Statistics Configuration
public Level getRootMetricsLevel();
public void enableStats(Level level);
}// Configure statistics at runtime
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
// Check current statistics level
Level currentLevel = runtime.getRootMetricsLevel();
System.out.println("Current stats level: " + currentLevel);
// Enable basic statistics
runtime.enableStats(Level.BASIC);
// Start runtime with statistics enabled
runtime.start();
// Process events...
InputHandler handler = runtime.getInputHandler("StockStream");
handler.send(new Object[]{"IBM", 150.0, 1000L});
// Enable detailed statistics for troubleshooting
runtime.enableStats(Level.DETAIL);
// Disable statistics for production performance
runtime.enableStats(Level.OFF);Configuration holder for Siddhi statistics module providing integration with monitoring systems.
public class StatisticsConfiguration {
// Constructor
public StatisticsConfiguration(StatisticsTrackerFactory factory);
// Configuration Access
public StatisticsTrackerFactory getFactory();
public String getMetricPrefix();
public void setMetricPrefix(String metricPrefix);
}public class SiddhiManager {
// Statistics Configuration
public void setStatisticsConfiguration(StatisticsConfiguration statisticsConfiguration);
}// Create custom statistics factory
StatisticsTrackerFactory customFactory = new CustomStatisticsTrackerFactory();
// Configure statistics
StatisticsConfiguration statsConfig = new StatisticsConfiguration(customFactory);
statsConfig.setMetricPrefix("siddhi.trading.app");
// Apply to SiddhiManager
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setStatisticsConfiguration(statsConfig);
// All applications created will use this configuration
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(tradingApp);
runtime.enableStats(Level.BASIC);Interface for statistics management providing centralized control over metrics collection.
public interface StatisticsManager {
// Statistics lifecycle
void startReporting();
void stopReporting();
// Metric registration
void registerStatisticsReporter(StatisticsReporter reporter);
void unregisterStatisticsReporter(StatisticsReporter reporter);
// Configuration
void setStatisticsConfiguration(StatisticsConfiguration configuration);
}Factory for creating statistics trackers for different types of metrics.
public interface StatisticsTrackerFactory {
// Tracker creation
LatencyTracker createLatencyTracker(String name, StatisticsManager statisticsManager);
ThroughputTracker createThroughputTracker(String name, StatisticsManager statisticsManager);
MemoryUsageTracker createMemoryUsageTracker(String name, StatisticsManager statisticsManager);
BufferedEventsTracker createBufferedEventsTracker(String name, StatisticsManager statisticsManager);
}Interface for latency tracking measuring processing time and response times.
public interface LatencyTracker {
// Latency measurement
void markIn();
void markOut();
// Batch latency tracking
void markIn(long count);
void markOut(long count);
// Statistics retrieval
double getAverageLatency();
double getMaxLatency();
double getMinLatency();
}Interface for throughput tracking measuring events processed per unit time.
public interface ThroughputTracker {
// Event counting
void eventIn();
void eventIn(long count);
// Throughput measurement
double getThroughput();
long getTotalEvents();
// Time-based statistics
double getThroughputForLastNSeconds(int seconds);
}// Custom statistics tracking implementation
public class CustomStatisticsTrackerFactory implements StatisticsTrackerFactory {
@Override
public LatencyTracker createLatencyTracker(String name, StatisticsManager manager) {
return new CustomLatencyTracker(name);
}
@Override
public ThroughputTracker createThroughputTracker(String name, StatisticsManager manager) {
return new CustomThroughputTracker(name);
}
// Custom latency tracker with Micrometer integration
private class CustomLatencyTracker implements LatencyTracker {
private final Timer timer;
private Timer.Sample sample;
public CustomLatencyTracker(String name) {
this.timer = Timer.builder(name + ".latency")
.description("Processing latency")
.register(meterRegistry);
}
@Override
public void markIn() {
this.sample = Timer.start(meterRegistry);
}
@Override
public void markOut() {
if (sample != null) {
sample.stop(timer);
}
}
@Override
public double getAverageLatency() {
return timer.mean(TimeUnit.MILLISECONDS);
}
}
}Interface for memory usage tracking monitoring resource consumption.
public interface MemoryUsageTracker {
// Memory measurement
void registerObject(Object object, long size);
void unregisterObject(Object object);
// Usage statistics
long getCurrentMemoryUsage();
long getMaxMemoryUsage();
// Reporting
void reportMemoryUsage();
}Interface for buffered events tracking monitoring queue sizes and processing backlogs.
public interface BufferedEventsTracker {
// Buffer monitoring
void eventBuffered();
void eventRemoved();
void eventBuffered(long count);
void eventRemoved(long count);
// Buffer statistics
long getCurrentBufferSize();
long getMaxBufferSize();
double getAverageBufferSize();
}Interface for memory calculation capability enabling objects to report their memory usage.
public interface MemoryCalculable {
long getSize();
}// Complete monitoring setup with multiple trackers
public class ComprehensiveMonitoring {
private final SiddhiAppRuntime runtime;
private final LatencyTracker processingLatency;
private final ThroughputTracker inputThroughput;
private final MemoryUsageTracker memoryTracker;
private final BufferedEventsTracker bufferTracker;
public ComprehensiveMonitoring(SiddhiAppRuntime runtime, StatisticsTrackerFactory factory) {
this.runtime = runtime;
// Create trackers for different metrics
StatisticsManager statsManager = getStatisticsManager(runtime);
this.processingLatency = factory.createLatencyTracker("processing", statsManager);
this.inputThroughput = factory.createThroughputTracker("input", statsManager);
this.memoryTracker = factory.createMemoryUsageTracker("memory", statsManager);
this.bufferTracker = factory.createBufferedEventsTracker("buffer", statsManager);
}
public void startMonitoring() {
// Enable detailed statistics
runtime.enableStats(Level.DETAIL);
// Start periodic reporting
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(this::reportMetrics, 10, 10, TimeUnit.SECONDS);
}
private void reportMetrics() {
System.out.println("=== Siddhi App Metrics ===");
System.out.println("Processing Latency: " + processingLatency.getAverageLatency() + " ms");
System.out.println("Input Throughput: " + inputThroughput.getThroughput() + " events/sec");
System.out.println("Memory Usage: " + formatBytes(memoryTracker.getCurrentMemoryUsage()));
System.out.println("Buffer Size: " + bufferTracker.getCurrentBufferSize() + " events");
System.out.println("========================");
}
private String formatBytes(long bytes) {
return String.format("%.2f MB", bytes / (1024.0 * 1024.0));
}
}// Performance monitoring with alerting
public class PerformanceMonitor extends StreamCallback {
private final ThroughputTracker throughputTracker;
private final LatencyTracker latencyTracker;
private final double throughputThreshold = 1000.0; // events/sec
private final double latencyThreshold = 100.0; // milliseconds
@Override
public void receive(Event[] events) {
// Track throughput
throughputTracker.eventIn(events.length);
// Measure processing latency
latencyTracker.markIn();
processEvents(events);
latencyTracker.markOut();
// Check performance thresholds
checkPerformanceThresholds();
}
private void checkPerformanceThresholds() {
double currentThroughput = throughputTracker.getThroughput();
double currentLatency = latencyTracker.getAverageLatency();
if (currentThroughput < throughputThreshold) {
alertLowThroughput(currentThroughput);
}
if (currentLatency > latencyThreshold) {
alertHighLatency(currentLatency);
}
}
private void alertLowThroughput(double throughput) {
System.err.println("ALERT: Low throughput detected: " + throughput + " events/sec");
// Send alert to monitoring system
}
private void alertHighLatency(double latency) {
System.err.println("ALERT: High latency detected: " + latency + " ms");
// Send alert to monitoring system
}
}// Memory usage monitoring for Siddhi components
public class MemoryMonitor {
private final SiddhiAppRuntime runtime;
public void monitorMemoryUsage() {
// Monitor table memory usage
Collection<Table> tables = runtime.getTables();
long totalTableMemory = 0;
for (Table table : tables) {
if (table instanceof MemoryCalculable) {
long tableMemory = ((MemoryCalculable) table).getSize();
totalTableMemory += tableMemory;
System.out.println("Table memory: " + formatBytes(tableMemory));
}
}
System.out.println("Total table memory: " + formatBytes(totalTableMemory));
// Check memory usage against limits
if (totalTableMemory > getMemoryLimit()) {
triggerMemoryAlert(totalTableMemory);
}
}
private void triggerMemoryAlert(long memoryUsage) {
System.err.println("MEMORY ALERT: Usage exceeds limit: " + formatBytes(memoryUsage));
// Trigger data purging or scale-out
triggerDataPurging();
}
private void triggerDataPurging() {
// Enable purging to free memory
runtime.setPurgingEnabled(true);
}
}// Integration with Micrometer metrics registry
public class MicrometerStatisticsFactory implements StatisticsTrackerFactory {
private final MeterRegistry meterRegistry;
public MicrometerStatisticsFactory(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Override
public ThroughputTracker createThroughputTracker(String name, StatisticsManager manager) {
return new MicrometerThroughputTracker(name, meterRegistry);
}
private static class MicrometerThroughputTracker implements ThroughputTracker {
private final Counter counter;
private final Gauge throughputGauge;
public MicrometerThroughputTracker(String name, MeterRegistry registry) {
this.counter = Counter.builder(name + ".events")
.description("Total events processed")
.register(registry);
this.throughputGauge = Gauge.builder(name + ".throughput")
.description("Events per second")
.register(registry, this, MicrometerThroughputTracker::calculateThroughput);
}
@Override
public void eventIn() {
counter.increment();
}
@Override
public double getThroughput() {
return calculateThroughput();
}
private double calculateThroughput() {
// Calculate throughput based on counter and time
return counter.count() / getUptimeSeconds();
}
}
}public interface StatisticsReporter {
void report(Map<String, Object> metrics);
void start();
void stop();
}
public interface StatisticsManager {
void registerTracker(String name, Object tracker);
void unregisterTracker(String name);
Map<String, Object> getAllMetrics();
}Install with Tessl CLI
npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core