0
# Event Handling
1
2
Event handling provides the core data structures and input mechanisms for processing streaming data in Siddhi. This includes event classes for external interaction, internal event processing structures, and input handlers for programmatic event injection.
3
4
## Event Classes
5
6
### Event
7
8
Main event class used external to Siddhi for data representation. This is the primary interface for sending and receiving data.
9
10
```java { .api }
11
public class Event {
12
// Constructors
13
public Event();
14
public Event(int dataSize);
15
public Event(long timestamp, Object[] data);
16
17
// Core Properties
18
public long getTimestamp();
19
public void setTimestamp(long timestamp);
20
public Object[] getData();
21
public void setData(Object[] data);
22
public Object getData(int i);
23
24
// State Management
25
public boolean isExpired();
26
public void setIsExpired(Boolean isExpired);
27
28
// Utility Methods
29
public void copyFrom(Event event);
30
public void copyFrom(ComplexEvent complexEvent);
31
public String toString();
32
public boolean equals(Object o);
33
public int hashCode();
34
}
35
```
36
37
### Usage Example
38
39
```java
40
// Create events with different constructors
41
Event event1 = new Event();
42
event1.setTimestamp(System.currentTimeMillis());
43
event1.setData(new Object[]{"IBM", 150.0, 1000L});
44
45
Event event2 = new Event(System.currentTimeMillis(), new Object[]{"MSFT", 120.0, 500L});
46
47
// Access event data
48
long timestamp = event2.getTimestamp();
49
Object[] data = event2.getData();
50
String symbol = (String) event2.getData(0);
51
Double price = (Double) event2.getData(1);
52
53
// Copy events
54
Event eventCopy = new Event();
55
eventCopy.copyFrom(event2);
56
57
// Check expiration status
58
if (!event2.isExpired()) {
59
// Process active event
60
}
61
```
62
63
## Internal Event Structures
64
65
### ComplexEvent
66
67
Interface for complex events used internally within Siddhi processing. These events support advanced features like event chaining and state management.
68
69
```java { .api }
70
public interface ComplexEvent {
71
// Internal interface for complex event processing
72
// Extended by internal event implementations
73
}
74
```
75
76
### ComplexEventChunk
77
78
Container for chaining complex events together for efficient processing within the Siddhi execution pipeline.
79
80
```java { .api }
81
public class ComplexEventChunk<T extends ComplexEvent> {
82
// Container for chaining complex events
83
// Provides efficient iteration and batch processing
84
// Used internally by Siddhi processors
85
}
86
```
87
88
### StreamEvent
89
90
Internal stream event implementation extending ComplexEvent, used for stream processing operations.
91
92
```java { .api }
93
public class StreamEvent implements ComplexEvent {
94
// Internal stream event implementation
95
// Used for stream processing operations
96
// Extends ComplexEvent with stream-specific functionality
97
}
98
```
99
100
### StateEvent
101
102
Event for maintaining state in pattern queries and joins, providing correlation between multiple streams.
103
104
```java { .api }
105
public class StateEvent implements ComplexEvent {
106
// Event for maintaining state in pattern queries
107
// Supports joins and correlations between streams
108
// Used in complex event processing scenarios
109
}
110
```
111
112
## Input Handling
113
114
### InputHandler
115
116
Entry point for injecting events into Siddhi streams programmatically. Provides various methods for sending events with different data formats and timing options.
117
118
```java { .api }
119
public class InputHandler {
120
// Constructor (typically obtained from SiddhiAppRuntime)
121
public InputHandler(String streamId, int streamIndex, InputProcessor inputProcessor, SiddhiAppContext siddhiAppContext);
122
123
// Stream Information
124
public String getStreamId();
125
126
// Event Sending Methods
127
public void send(Object[] data);
128
public void send(long timestamp, Object[] data);
129
public void send(Event event);
130
public void send(Event[] events);
131
}
132
```
133
134
### Usage Example
135
136
```java
137
// Obtain input handler from SiddhiAppRuntime
138
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockStream");
139
140
// Send event with current timestamp
141
inputHandler.send(new Object[]{"IBM", 150.0f, 1000L});
142
143
// Send event with specific timestamp
144
long customTimestamp = System.currentTimeMillis() - 1000; // 1 second ago
145
inputHandler.send(customTimestamp, new Object[]{"MSFT", 120.0f, 500L});
146
147
// Send Event object
148
Event event = new Event(System.currentTimeMillis(), new Object[]{"GOOGL", 2500.0f, 200L});
149
inputHandler.send(event);
150
151
// Send batch of events
152
Event[] events = {
153
new Event(System.currentTimeMillis(), new Object[]{"AAPL", 180.0f, 300L}),
154
new Event(System.currentTimeMillis(), new Object[]{"TSLA", 800.0f, 150L})
155
};
156
inputHandler.send(events);
157
```
158
159
## Stream Processing Components
160
161
### InputManager
162
163
Management of input streams and coordination of input processing across multiple streams.
164
165
```java { .api }
166
public class InputManager {
167
// Manages input streams
168
// Coordinates input processing
169
// Handles stream routing and distribution
170
}
171
```
172
173
### StreamJunction
174
175
Core stream routing and event distribution component that manages the flow of events between different processing components.
176
177
```java { .api }
178
public class StreamJunction {
179
// Core stream routing component
180
// Manages event distribution
181
// Connects input handlers to processors
182
}
183
```
184
185
## Advanced Event Features
186
187
### Event Processing Patterns
188
189
```java
190
// Batch processing example
191
Event[] batchEvents = new Event[100];
192
for (int i = 0; i < 100; i++) {
193
batchEvents[i] = new Event(System.currentTimeMillis(), generateData());
194
}
195
inputHandler.send(batchEvents);
196
197
// Timestamped event processing
198
long baseTime = System.currentTimeMillis();
199
for (int i = 0; i < 10; i++) {
200
// Send events with incremental timestamps for replay scenarios
201
inputHandler.send(baseTime + (i * 1000), new Object[]{"SYM" + i, 100.0 + i, 100L});
202
}
203
204
// Event state management
205
Event event = new Event(timestamp, data);
206
if (shouldExpire(event)) {
207
event.setIsExpired(true);
208
}
209
```
210
211
### Performance Considerations
212
213
```java
214
// Efficient event creation for high-throughput scenarios
215
public class EventPool {
216
private final Queue<Event> pool = new ConcurrentLinkedQueue<>();
217
218
public Event getEvent() {
219
Event event = pool.poll();
220
if (event == null) {
221
event = new Event();
222
}
223
return event;
224
}
225
226
public void returnEvent(Event event) {
227
// Reset event state
228
event.setTimestamp(0);
229
event.setData(null);
230
event.setIsExpired(false);
231
pool.offer(event);
232
}
233
}
234
235
// High-performance event sending
236
InputHandler handler = siddhiAppRuntime.getInputHandler("HighVolumeStream");
237
Object[] dataBuffer = new Object[3]; // Reuse data array
238
for (MarketData data : marketDataStream) {
239
dataBuffer[0] = data.getSymbol();
240
dataBuffer[1] = data.getPrice();
241
dataBuffer[2] = data.getVolume();
242
handler.send(data.getTimestamp(), dataBuffer);
243
}
244
```
245
246
## Types
247
248
```java { .api }
249
public interface InputProcessor {
250
// Interface for processing input events
251
void send(Event event, int streamIndex);
252
void send(Event[] events, int streamIndex);
253
}
254
255
public interface SiddhiAppContext {
256
// Application context for Siddhi runtime
257
// Provides access to configuration and resources
258
}
259
```