CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-wso2-siddhi--siddhi-core

Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.

Overview
Eval results
Files

event-handling.mddocs/

Event Handling

Event handling provides the core data structures and input mechanisms for processing streaming data in Siddhi. This includes event classes for external interaction, internal event processing structures, and input handlers for programmatic event injection.

Event Classes

Event

Main event class used external to Siddhi for data representation. This is the primary interface for sending and receiving data.

public class Event {
    // Constructors
    public Event();
    public Event(int dataSize);
    public Event(long timestamp, Object[] data);
    
    // Core Properties
    public long getTimestamp();
    public void setTimestamp(long timestamp);
    public Object[] getData();
    public void setData(Object[] data);
    public Object getData(int i);
    
    // State Management
    public boolean isExpired();
    public void setIsExpired(Boolean isExpired);
    
    // Utility Methods
    public void copyFrom(Event event);
    public void copyFrom(ComplexEvent complexEvent);
    public String toString();
    public boolean equals(Object o);
    public int hashCode();
}

Usage Example

// Create events with different constructors
Event event1 = new Event();
event1.setTimestamp(System.currentTimeMillis());
event1.setData(new Object[]{"IBM", 150.0, 1000L});

Event event2 = new Event(System.currentTimeMillis(), new Object[]{"MSFT", 120.0, 500L});

// Access event data
long timestamp = event2.getTimestamp();
Object[] data = event2.getData();
String symbol = (String) event2.getData(0);
Double price = (Double) event2.getData(1);

// Copy events
Event eventCopy = new Event();
eventCopy.copyFrom(event2);

// Check expiration status
if (!event2.isExpired()) {
    // Process active event
}

Internal Event Structures

ComplexEvent

Interface for complex events used internally within Siddhi processing. These events support advanced features like event chaining and state management.

public interface ComplexEvent {
    // Internal interface for complex event processing
    // Extended by internal event implementations
}

ComplexEventChunk

Container for chaining complex events together for efficient processing within the Siddhi execution pipeline.

public class ComplexEventChunk<T extends ComplexEvent> {
    // Container for chaining complex events
    // Provides efficient iteration and batch processing
    // Used internally by Siddhi processors
}

StreamEvent

Internal stream event implementation extending ComplexEvent, used for stream processing operations.

public class StreamEvent implements ComplexEvent {
    // Internal stream event implementation
    // Used for stream processing operations
    // Extends ComplexEvent with stream-specific functionality
}

StateEvent

Event for maintaining state in pattern queries and joins, providing correlation between multiple streams.

public class StateEvent implements ComplexEvent {
    // Event for maintaining state in pattern queries
    // Supports joins and correlations between streams
    // Used in complex event processing scenarios
}

Input Handling

InputHandler

Entry point for injecting events into Siddhi streams programmatically. Provides various methods for sending events with different data formats and timing options.

public class InputHandler {
    // Constructor (typically obtained from SiddhiAppRuntime)
    public InputHandler(String streamId, int streamIndex, InputProcessor inputProcessor, SiddhiAppContext siddhiAppContext);
    
    // Stream Information
    public String getStreamId();
    
    // Event Sending Methods
    public void send(Object[] data);
    public void send(long timestamp, Object[] data);
    public void send(Event event);
    public void send(Event[] events);
}

Usage Example

// Obtain input handler from SiddhiAppRuntime
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockStream");

// Send event with current timestamp
inputHandler.send(new Object[]{"IBM", 150.0f, 1000L});

// Send event with specific timestamp
long customTimestamp = System.currentTimeMillis() - 1000; // 1 second ago
inputHandler.send(customTimestamp, new Object[]{"MSFT", 120.0f, 500L});

// Send Event object
Event event = new Event(System.currentTimeMillis(), new Object[]{"GOOGL", 2500.0f, 200L});
inputHandler.send(event);

// Send batch of events
Event[] events = {
    new Event(System.currentTimeMillis(), new Object[]{"AAPL", 180.0f, 300L}),
    new Event(System.currentTimeMillis(), new Object[]{"TSLA", 800.0f, 150L})
};
inputHandler.send(events);

Stream Processing Components

InputManager

Management of input streams and coordination of input processing across multiple streams.

public class InputManager {
    // Manages input streams
    // Coordinates input processing
    // Handles stream routing and distribution
}

StreamJunction

Core stream routing and event distribution component that manages the flow of events between different processing components.

public class StreamJunction {
    // Core stream routing component
    // Manages event distribution
    // Connects input handlers to processors
}

Advanced Event Features

Event Processing Patterns

// Batch processing example
Event[] batchEvents = new Event[100];
for (int i = 0; i < 100; i++) {
    batchEvents[i] = new Event(System.currentTimeMillis(), generateData());
}
inputHandler.send(batchEvents);

// Timestamped event processing
long baseTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
    // Send events with incremental timestamps for replay scenarios
    inputHandler.send(baseTime + (i * 1000), new Object[]{"SYM" + i, 100.0 + i, 100L});
}

// Event state management
Event event = new Event(timestamp, data);
if (shouldExpire(event)) {
    event.setIsExpired(true);
}

Performance Considerations

// Efficient event creation for high-throughput scenarios
public class EventPool {
    private final Queue<Event> pool = new ConcurrentLinkedQueue<>();
    
    public Event getEvent() {
        Event event = pool.poll();
        if (event == null) {
            event = new Event();
        }
        return event;
    }
    
    public void returnEvent(Event event) {
        // Reset event state
        event.setTimestamp(0);
        event.setData(null);
        event.setIsExpired(false);
        pool.offer(event);
    }
}

// High-performance event sending
InputHandler handler = siddhiAppRuntime.getInputHandler("HighVolumeStream");
Object[] dataBuffer = new Object[3]; // Reuse data array
for (MarketData data : marketDataStream) {
    dataBuffer[0] = data.getSymbol();
    dataBuffer[1] = data.getPrice();
    dataBuffer[2] = data.getVolume();
    handler.send(data.getTimestamp(), dataBuffer);
}

Types

public interface InputProcessor {
    // Interface for processing input events
    void send(Event event, int streamIndex);
    void send(Event[] events, int streamIndex);
}

public interface SiddhiAppContext {
    // Application context for Siddhi runtime
    // Provides access to configuration and resources
}

Install with Tessl CLI

npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core

docs

aggregations.md

core-management.md

event-handling.md

exceptions.md

extensions.md

index.md

persistence.md

queries-and-callbacks.md

statistics.md

tile.json