Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
Event handling provides the core data structures and input mechanisms for processing streaming data in Siddhi. This includes event classes for external interaction, internal event processing structures, and input handlers for programmatic event injection.
Main event class used external to Siddhi for data representation. This is the primary interface for sending and receiving data.
public class Event {
// Constructors
public Event();
public Event(int dataSize);
public Event(long timestamp, Object[] data);
// Core Properties
public long getTimestamp();
public void setTimestamp(long timestamp);
public Object[] getData();
public void setData(Object[] data);
public Object getData(int i);
// State Management
public boolean isExpired();
public void setIsExpired(Boolean isExpired);
// Utility Methods
public void copyFrom(Event event);
public void copyFrom(ComplexEvent complexEvent);
public String toString();
public boolean equals(Object o);
public int hashCode();
}// Create events with different constructors
Event event1 = new Event();
event1.setTimestamp(System.currentTimeMillis());
event1.setData(new Object[]{"IBM", 150.0, 1000L});
Event event2 = new Event(System.currentTimeMillis(), new Object[]{"MSFT", 120.0, 500L});
// Access event data
long timestamp = event2.getTimestamp();
Object[] data = event2.getData();
String symbol = (String) event2.getData(0);
Double price = (Double) event2.getData(1);
// Copy events
Event eventCopy = new Event();
eventCopy.copyFrom(event2);
// Check expiration status
if (!event2.isExpired()) {
// Process active event
}Interface for complex events used internally within Siddhi processing. These events support advanced features like event chaining and state management.
public interface ComplexEvent {
// Internal interface for complex event processing
// Extended by internal event implementations
}Container for chaining complex events together for efficient processing within the Siddhi execution pipeline.
public class ComplexEventChunk<T extends ComplexEvent> {
// Container for chaining complex events
// Provides efficient iteration and batch processing
// Used internally by Siddhi processors
}Internal stream event implementation extending ComplexEvent, used for stream processing operations.
public class StreamEvent implements ComplexEvent {
// Internal stream event implementation
// Used for stream processing operations
// Extends ComplexEvent with stream-specific functionality
}Event for maintaining state in pattern queries and joins, providing correlation between multiple streams.
public class StateEvent implements ComplexEvent {
// Event for maintaining state in pattern queries
// Supports joins and correlations between streams
// Used in complex event processing scenarios
}Entry point for injecting events into Siddhi streams programmatically. Provides various methods for sending events with different data formats and timing options.
public class InputHandler {
// Constructor (typically obtained from SiddhiAppRuntime)
public InputHandler(String streamId, int streamIndex, InputProcessor inputProcessor, SiddhiAppContext siddhiAppContext);
// Stream Information
public String getStreamId();
// Event Sending Methods
public void send(Object[] data);
public void send(long timestamp, Object[] data);
public void send(Event event);
public void send(Event[] events);
}// Obtain input handler from SiddhiAppRuntime
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockStream");
// Send event with current timestamp
inputHandler.send(new Object[]{"IBM", 150.0f, 1000L});
// Send event with specific timestamp
long customTimestamp = System.currentTimeMillis() - 1000; // 1 second ago
inputHandler.send(customTimestamp, new Object[]{"MSFT", 120.0f, 500L});
// Send Event object
Event event = new Event(System.currentTimeMillis(), new Object[]{"GOOGL", 2500.0f, 200L});
inputHandler.send(event);
// Send batch of events
Event[] events = {
new Event(System.currentTimeMillis(), new Object[]{"AAPL", 180.0f, 300L}),
new Event(System.currentTimeMillis(), new Object[]{"TSLA", 800.0f, 150L})
};
inputHandler.send(events);Management of input streams and coordination of input processing across multiple streams.
public class InputManager {
// Manages input streams
// Coordinates input processing
// Handles stream routing and distribution
}Core stream routing and event distribution component that manages the flow of events between different processing components.
public class StreamJunction {
// Core stream routing component
// Manages event distribution
// Connects input handlers to processors
}// Batch processing example
Event[] batchEvents = new Event[100];
for (int i = 0; i < 100; i++) {
batchEvents[i] = new Event(System.currentTimeMillis(), generateData());
}
inputHandler.send(batchEvents);
// Timestamped event processing
long baseTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
// Send events with incremental timestamps for replay scenarios
inputHandler.send(baseTime + (i * 1000), new Object[]{"SYM" + i, 100.0 + i, 100L});
}
// Event state management
Event event = new Event(timestamp, data);
if (shouldExpire(event)) {
event.setIsExpired(true);
}// Efficient event creation for high-throughput scenarios
public class EventPool {
private final Queue<Event> pool = new ConcurrentLinkedQueue<>();
public Event getEvent() {
Event event = pool.poll();
if (event == null) {
event = new Event();
}
return event;
}
public void returnEvent(Event event) {
// Reset event state
event.setTimestamp(0);
event.setData(null);
event.setIsExpired(false);
pool.offer(event);
}
}
// High-performance event sending
InputHandler handler = siddhiAppRuntime.getInputHandler("HighVolumeStream");
Object[] dataBuffer = new Object[3]; // Reuse data array
for (MarketData data : marketDataStream) {
dataBuffer[0] = data.getSymbol();
dataBuffer[1] = data.getPrice();
dataBuffer[2] = data.getVolume();
handler.send(data.getTimestamp(), dataBuffer);
}public interface InputProcessor {
// Interface for processing input events
void send(Event event, int streamIndex);
void send(Event[] events, int streamIndex);
}
public interface SiddhiAppContext {
// Application context for Siddhi runtime
// Provides access to configuration and resources
}Install with Tessl CLI
npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core