0
# Extensions
1
2
Extension points provide a pluggable architecture for creating custom sources, sinks, functions, and processors to extend Siddhi capabilities. The extension system enables developers to add domain-specific functionality while maintaining integration with the core Siddhi processing engine.
3
4
## Core Extension Interfaces
5
6
### Source
7
8
Abstract class for creating custom input sources that can feed data into Siddhi streams from external systems.
9
10
```java { .api }
11
public abstract class Source {
12
// Abstract methods that must be implemented
13
public abstract void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,
14
String[] requestedTransportPropertyNames, ConfigReader configReader,
15
SiddhiAppContext siddhiAppContext);
16
public abstract Class[] getOutputEventClasses();
17
public abstract void connect(ConnectionCallback connectionCallback);
18
public abstract void disconnect();
19
public abstract void destroy();
20
public abstract void pause();
21
public abstract void resume();
22
23
// Concrete methods available
24
public void connectWithRetry();
25
public SourceMapper getMapper();
26
public void shutdown();
27
public String getType();
28
public StreamDefinition getStreamDefinition();
29
}
30
```
31
32
### Sink
33
34
Abstract class for creating custom output sinks that can send processed data to external systems.
35
36
```java { .api }
37
public abstract class Sink {
38
// Abstract methods that must be implemented
39
public abstract Class[] getSupportedInputEventClasses();
40
public abstract String[] getSupportedDynamicOptions();
41
public abstract void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,
42
ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext);
43
public abstract void publish(Object payload, DynamicOptions transportOptions);
44
public abstract void connect();
45
public abstract void disconnect();
46
public abstract void destroy();
47
48
// Concrete methods available
49
public void publish(Object payload);
50
public void connectWithRetry();
51
public void shutdown();
52
public String getType();
53
public SinkMapper getMapper();
54
public SinkHandler getHandler();
55
public StreamDefinition getStreamDefinition();
56
public boolean isConnected();
57
}
58
```
59
60
### FunctionExecutor
61
62
Abstract class for custom functions that can be used in Siddhi queries for data transformation and computation.
63
64
```java { .api }
65
public abstract class FunctionExecutor implements ExpressionExecutor {
66
// Abstract methods that must be implemented
67
public abstract void init(ExpressionExecutor[] attributeExpressionExecutors,
68
ConfigReader configReader, SiddhiAppContext siddhiAppContext);
69
public abstract Object execute(Object[] data);
70
public abstract Object execute(Object data);
71
72
// Concrete methods available from ExpressionExecutor
73
public void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,
74
SiddhiAppContext siddhiAppContext, String queryName,
75
ConfigReader configReader);
76
public Object execute(ComplexEvent event);
77
public ExpressionExecutor cloneExecutor(String key);
78
public String getElementId();
79
public void clean();
80
}
81
```
82
83
### StreamProcessor
84
85
Interface for custom stream processing that can transform or filter events in the processing pipeline.
86
87
```java { .api }
88
public abstract class StreamProcessor {
89
// Initialization
90
public abstract void init(MetaStreamEvent metaStreamEvent,
91
AbstractDefinition inputDefinition,
92
ExpressionExecutor[] attributeExpressionExecutors,
93
ConfigReader configReader,
94
StreamEventClonerHolder streamEventClonerHolder,
95
boolean outputExpectsExpiredEvents,
96
boolean findToBeExecuted,
97
SiddhiAppContext siddhiAppContext);
98
99
// Processing
100
public abstract void process(ComplexEventChunk<StreamEvent> streamEventChunk,
101
Processor nextProcessor,
102
StreamEventCloner streamEventCloner,
103
ComplexEventPopulater complexEventPopulater);
104
105
// Lifecycle
106
public abstract void start();
107
public abstract void stop();
108
109
// Configuration
110
public abstract List<Attribute> getReturnAttributes();
111
}
112
```
113
114
## Extension Registration
115
116
### SiddhiManager Extension Registration
117
118
```java { .api }
119
public class SiddhiManager {
120
// Extension Management
121
public void setExtension(String name, Class clazz);
122
public Map<String, Class> getExtensions();
123
public void removeExtension(String name);
124
}
125
```
126
127
### Usage Examples
128
129
```java
130
// Register custom extensions
131
SiddhiManager siddhiManager = new SiddhiManager();
132
133
// Register custom function
134
siddhiManager.setExtension("math:factorial", FactorialFunctionExecutor.class);
135
136
// Register custom source
137
siddhiManager.setExtension("kafka", KafkaSource.class);
138
139
// Register custom sink
140
siddhiManager.setExtension("elasticsearch", ElasticsearchSink.class);
141
142
// Register custom stream processor
143
siddhiManager.setExtension("ml:predict", MLPredictionProcessor.class);
144
145
// Use extensions in Siddhi app
146
String siddhiApp =
147
"@source(type='kafka', topic='stock-data', bootstrap.servers='localhost:9092', " +
148
" @map(type='json')) " +
149
"define stream StockStream (symbol string, price double, volume long); " +
150
151
"@sink(type='elasticsearch', hostname='localhost', port='9200', " +
152
" index.name='stock-analysis', @map(type='json')) " +
153
"define stream ProcessedStream (symbol string, processedPrice double, prediction string); " +
154
155
"from StockStream " +
156
"select symbol, " +
157
" math:factorial(volume % 10) as processedPrice, " +
158
" ml:predict(price, volume) as prediction " +
159
"insert into ProcessedStream;";
160
```
161
162
## Built-in Extensions
163
164
### InMemorySource
165
166
Built-in in-memory event source for testing and development.
167
168
```java { .api }
169
public class InMemorySource implements Source {
170
// Built-in source for in-memory event generation
171
// Useful for testing and development scenarios
172
}
173
```
174
175
### InMemorySink
176
177
Built-in in-memory event sink for collecting results during testing.
178
179
```java { .api }
180
public class InMemorySink implements Sink {
181
// Built-in sink for in-memory event collection
182
// Useful for testing and result collection
183
}
184
```
185
186
### LogSink
187
188
Built-in logging sink for debugging and monitoring.
189
190
```java { .api }
191
public class LogSink implements Sink {
192
// Built-in sink for logging events
193
// Useful for debugging and monitoring
194
}
195
```
196
197
## Extension Examples
198
199
### Custom Function Example
200
201
```java
202
// Custom mathematical function
203
public class FactorialFunctionExecutor extends FunctionExecutor {
204
205
@Override
206
public void init(AttributeExpressionExecutor[] attributeExpressionExecutors,
207
ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
208
if (attributeExpressionExecutors.length != 1) {
209
throw new SiddhiAppValidationException("Factorial function requires exactly one parameter");
210
}
211
212
if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT &&
213
attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
214
throw new SiddhiAppValidationException("Factorial function requires integer input");
215
}
216
}
217
218
@Override
219
public Object execute(Object[] data) {
220
if (data[0] == null) {
221
return null;
222
}
223
224
int n = ((Number) data[0]).intValue();
225
if (n < 0) {
226
throw new SiddhiAppRuntimeException("Factorial not defined for negative numbers");
227
}
228
229
long result = 1;
230
for (int i = 2; i <= n; i++) {
231
result *= i;
232
}
233
return result;
234
}
235
236
@Override
237
public Attribute.Type getReturnType() {
238
return Attribute.Type.LONG;
239
}
240
}
241
```
242
243
### Custom Source Example
244
245
```java
246
// Custom HTTP source
247
public class HttpSource implements Source {
248
private SourceEventListener sourceEventListener;
249
private String url;
250
private int pollInterval;
251
private HttpClient httpClient;
252
private ScheduledExecutorService scheduler;
253
254
@Override
255
public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,
256
String[] requestedTransportPropertyNames, ConfigReader configReader,
257
SiddhiAppContext siddhiAppContext) {
258
this.sourceEventListener = sourceEventListener;
259
this.url = optionHolder.validateAndGetStaticValue("url");
260
this.pollInterval = Integer.parseInt(optionHolder.validateAndGetStaticValue("poll.interval", "5000"));
261
this.httpClient = HttpClient.newHttpClient();
262
this.scheduler = Executors.newScheduledThreadPool(1);
263
}
264
265
@Override
266
public void connect(ConnectionCallback connectionCallback, State state) {
267
scheduler.scheduleAtFixedRate(() -> {
268
try {
269
HttpRequest request = HttpRequest.newBuilder()
270
.uri(URI.create(url))
271
.build();
272
273
HttpResponse<String> response = httpClient.send(request,
274
HttpResponse.BodyHandlers.ofString());
275
276
if (response.statusCode() == 200) {
277
// Parse response and send to Siddhi
278
Object[] eventData = parseResponse(response.body());
279
sourceEventListener.onEvent(eventData, null);
280
}
281
} catch (Exception e) {
282
connectionCallback.onError(e);
283
}
284
}, 0, pollInterval, TimeUnit.MILLISECONDS);
285
286
connectionCallback.onConnect();
287
}
288
289
@Override
290
public void disconnect() {
291
if (scheduler != null) {
292
scheduler.shutdown();
293
}
294
}
295
296
@Override
297
public void destroy() {
298
disconnect();
299
}
300
301
@Override
302
public void pause() {
303
// Implementation for pausing
304
}
305
306
@Override
307
public void resume() {
308
// Implementation for resuming
309
}
310
311
@Override
312
public Class[] getSupportedInputEventClasses() {
313
return new Class[]{Map.class, Object[].class};
314
}
315
316
@Override
317
public String[] getSupportedDynamicOptions() {
318
return new String[]{"url"};
319
}
320
321
private Object[] parseResponse(String responseBody) {
322
// Parse HTTP response into event data
323
// Implementation depends on response format
324
return new Object[]{responseBody, System.currentTimeMillis()};
325
}
326
}
327
```
328
329
### Custom Sink Example
330
331
```java
332
// Custom database sink
333
public class DatabaseSink implements Sink {
334
private DataSource dataSource;
335
private String tableName;
336
private String[] columnNames;
337
338
@Override
339
public void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,
340
ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
341
String dataSourceName = optionHolder.validateAndGetStaticValue("datasource");
342
this.dataSource = siddhiAppContext.getSiddhiDataSource(dataSourceName);
343
this.tableName = optionHolder.validateAndGetStaticValue("table.name");
344
345
// Extract column names from stream definition
346
List<Attribute> attributes = outputStreamDefinition.getAttributeList();
347
this.columnNames = attributes.stream()
348
.map(Attribute::getName)
349
.toArray(String[]::new);
350
}
351
352
@Override
353
public void connect() {
354
// Verify database connection
355
try (Connection conn = dataSource.getConnection()) {
356
// Test connection
357
} catch (SQLException e) {
358
throw new ConnectionUnavailableException("Database connection failed", e);
359
}
360
}
361
362
@Override
363
public void publish(Object payload, DynamicOptions dynamicOptions, State state) {
364
Object[] eventData = (Object[]) payload;
365
366
StringBuilder sql = new StringBuilder("INSERT INTO ");
367
sql.append(tableName).append(" (");
368
sql.append(String.join(", ", columnNames));
369
sql.append(") VALUES (");
370
sql.append(String.join(", ", Collections.nCopies(columnNames.length, "?")));
371
sql.append(")");
372
373
try (Connection conn = dataSource.getConnection();
374
PreparedStatement stmt = conn.prepareStatement(sql.toString())) {
375
376
for (int i = 0; i < eventData.length; i++) {
377
stmt.setObject(i + 1, eventData[i]);
378
}
379
380
stmt.executeUpdate();
381
} catch (SQLException e) {
382
throw new ConnectionUnavailableException("Database insert failed", e);
383
}
384
}
385
386
@Override
387
public void disconnect() {
388
// Cleanup resources
389
}
390
391
@Override
392
public void destroy() {
393
disconnect();
394
}
395
396
@Override
397
public Class[] getSupportedInputEventClasses() {
398
return new Class[]{Object[].class, Map.class};
399
}
400
401
@Override
402
public String[] getSupportedDynamicOptions() {
403
return new String[]{"table.name"};
404
}
405
}
406
```
407
408
## Extension Mappers
409
410
### SourceMapper
411
412
Interface for custom source data mapping to convert external data formats to Siddhi events.
413
414
```java { .api }
415
public interface SourceMapper {
416
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
417
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
418
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
419
420
void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,
421
Object eventObject, SourceEventListener sourceEventListener);
422
423
Class[] getSupportedInputEventClasses();
424
}
425
```
426
427
### SinkMapper
428
429
Interface for custom sink data mapping to convert Siddhi events to external data formats.
430
431
```java { .api }
432
public interface SinkMapper {
433
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
434
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
435
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
436
437
void mapAndSend(Event[] events, OptionHolder optionHolder,
438
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
439
SinkListener sinkListener);
440
441
Class[] getSupportedInputEventClasses();
442
}
443
```
444
445
## Extension Holders
446
447
Extension holders manage different types of extensions and provide common functionality.
448
449
```java { .api }
450
public abstract class AbstractExtensionHolder {
451
// Common functionality for extension management
452
protected Map<String, Class<?>> extensions;
453
454
public void addExtension(String name, Class<?> extensionClass);
455
public Class<?> getExtension(String name);
456
public void removeExtension(String name);
457
}
458
459
public class FunctionExecutorExtensionHolder extends AbstractExtensionHolder {
460
// Manages function executor extensions
461
}
462
463
public class SourceExtensionHolder extends AbstractExtensionHolder {
464
// Manages source extensions
465
}
466
467
public class SinkExtensionHolder extends AbstractExtensionHolder {
468
// Manages sink extensions
469
}
470
```
471
472
## Types
473
474
```java { .api }
475
public interface SourceEventListener {
476
void onEvent(Object eventObject, Object[] transportProperties);
477
void onEvent(Object eventObject, Object[] transportProperties, String[] transportSyncProperties);
478
}
479
480
public interface ConnectionCallback {
481
void onConnect();
482
void onError(Exception e);
483
}
484
485
public interface SinkListener {
486
void publish(Object payload);
487
}
488
489
public interface OptionHolder {
490
String validateAndGetStaticValue(String key);
491
String validateAndGetStaticValue(String key, String defaultValue);
492
String getOrCreateOption(String key, String defaultValue);
493
}
494
495
public interface ConfigReader {
496
String readConfig(String key, String defaultValue);
497
Map<String, String> getAllConfigs();
498
}
499
500
public interface DynamicOptions {
501
String get(String key);
502
}
503
504
public interface State {
505
boolean canDestroy();
506
Map<String, Object> getState();
507
void restoreState(Map<String, Object> state);
508
}
509
510
public interface AttributeMapping {
511
String getName();
512
String getMapping();
513
}
514
515
public interface TemplateBuilder {
516
String build(Event event);
517
}
518
519
public interface ExpressionExecutor {
520
void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,
521
SiddhiAppContext siddhiAppContext, String queryName,
522
ConfigReader configReader);
523
Object execute(ComplexEvent event);
524
ExpressionExecutor cloneExecutor(String key);
525
String getElementId();
526
void clean();
527
Attribute.Type getReturnType();
528
}
529
530
public interface Snapshotable {
531
Map<String, Object> currentState();
532
void restoreState(Map<String, Object> state);
533
}
534
535
public interface SinkHandler {
536
// Handler for sink operations
537
}
538
539
public interface SinkMapper {
540
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
541
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
542
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
543
void mapAndSend(Event[] events, OptionHolder optionHolder,
544
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
545
SinkListener sinkListener);
546
Class[] getSupportedInputEventClasses();
547
}
548
549
public interface SourceMapper {
550
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
551
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
552
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
553
void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,
554
Object eventObject, SourceEventListener sourceEventListener);
555
Class[] getSupportedInputEventClasses();
556
}
557
```