Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
npx @tessl/cli install tessl/maven-org-wso2-siddhi--siddhi-core@4.5.0Siddhi Core is a high-performing Complex Event Processing (CEP) engine Java library that provides comprehensive stream processing capabilities. It enables real-time processing of data streams, detection of complex conditions through Streaming SQL, and triggering of responsive actions. Built for high-performance scenarios, Siddhi Core can process 300,000+ events per second and is designed to be lightweight (<2MB) and embeddable across diverse platforms from enterprise systems to Android and Raspberry Pi devices.
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
<version>4.5.11</version>
</dependency>import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.config.SiddhiContext;// Create Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
// Create Siddhi App with streaming SQL
String siddhiApp = "define stream StockStream (symbol string, price float, volume long); " +
"from StockStream[price > 100] select symbol, price insert into HighPriceStocks;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
// Add callback to receive results
siddhiAppRuntime.addCallback("HighPriceStocks", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
System.out.println("High price stock: " + event);
}
}
});
// Get input handler and start runtime
InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
siddhiAppRuntime.start();
// Send events
stockStream.send(new Object[]{"IBM", 150.0f, 100L});
stockStream.send(new Object[]{"MSFT", 75.0f, 200L});
// Shutdown
siddhiAppRuntime.shutdown();
siddhiManager.shutdown();Siddhi Core follows a modular architecture with these key components:
Primary interfaces for creating and managing Siddhi applications with full lifecycle control.
public class SiddhiManager {
public SiddhiManager();
public SiddhiAppRuntime createSiddhiAppRuntime(String siddhiApp);
public SiddhiAppRuntime getSiddhiAppRuntime(String siddhiAppName);
public void shutdown();
}
public class SiddhiAppRuntime {
public String getName();
public void start();
public void shutdown();
public InputHandler getInputHandler(String streamId);
}Event classes and input mechanisms for processing streaming data with high-performance capabilities.
public class Event {
public Event(long timestamp, Object[] data);
public long getTimestamp();
public Object[] getData();
}
public class InputHandler {
public void send(Object[] data);
public void send(long timestamp, Object[] data);
public void send(Event event);
}Query processing and callback mechanisms for receiving processed results from streaming SQL operations.
public abstract class StreamCallback {
public abstract void receive(Event[] events);
public void setStreamId(String streamId);
}
public abstract class QueryCallback {
public abstract void receive(long timestamp, Event[] inEvents, Event[] removeEvents);
}Incremental aggregation processing with support for different time durations and distributed processing.
public class AggregationRuntime {
// Manages incremental aggregations across time durations
// Supports distributed aggregation processing
// Provides incremental data purging capabilities
}State management and persistence capabilities for fault tolerance and recovery scenarios.
public interface PersistenceStore {
void save(String siddhiAppName, String revision, byte[] snapshot);
byte[] load(String siddhiAppName, String revision);
}
public class SiddhiAppRuntime {
public PersistenceReference persist();
public byte[] snapshot();
public void restore(byte[] snapshot);
}Extension points for creating custom sources, sinks, functions, and processors to extend Siddhi capabilities.
public interface Source {
// Interface for creating custom input sources
}
public interface Sink {
// Interface for creating custom output sinks
}
public interface FunctionExecutor {
// Interface for custom functions
}Monitoring and statistics interfaces for tracking performance, throughput, and resource usage.
public enum Level {
OFF, BASIC, DETAIL
}
public class SiddhiAppRuntime {
public Level getRootMetricsLevel();
public void enableStats(Level level);
}Comprehensive exception handling for various error scenarios in stream processing operations.
public class SiddhiAppCreationException extends SiddhiException {
// Thrown during Siddhi app creation errors
}
public class SiddhiAppRuntimeException extends SiddhiException {
// Runtime errors in Siddhi app execution
}public interface ComplexEvent {
// Interface for complex events used internally
}
public class ComplexEventChunk<T extends ComplexEvent> {
// Container for chaining complex events together
}
public interface Processor {
void process(ComplexEventChunk complexEventChunk);
Processor getNextProcessor();
void setNextProcessor(Processor processor);
}