Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
Extension points provide a pluggable architecture for creating custom sources, sinks, functions, and processors to extend Siddhi capabilities. The extension system enables developers to add domain-specific functionality while maintaining integration with the core Siddhi processing engine.
Abstract class for creating custom input sources that can feed data into Siddhi streams from external systems.
public abstract class Source {
// Abstract methods that must be implemented
public abstract void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,
String[] requestedTransportPropertyNames, ConfigReader configReader,
SiddhiAppContext siddhiAppContext);
public abstract Class[] getOutputEventClasses();
public abstract void connect(ConnectionCallback connectionCallback);
public abstract void disconnect();
public abstract void destroy();
public abstract void pause();
public abstract void resume();
// Concrete methods available
public void connectWithRetry();
public SourceMapper getMapper();
public void shutdown();
public String getType();
public StreamDefinition getStreamDefinition();
}Abstract class for creating custom output sinks that can send processed data to external systems.
public abstract class Sink {
// Abstract methods that must be implemented
public abstract Class[] getSupportedInputEventClasses();
public abstract String[] getSupportedDynamicOptions();
public abstract void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,
ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext);
public abstract void publish(Object payload, DynamicOptions transportOptions);
public abstract void connect();
public abstract void disconnect();
public abstract void destroy();
// Concrete methods available
public void publish(Object payload);
public void connectWithRetry();
public void shutdown();
public String getType();
public SinkMapper getMapper();
public SinkHandler getHandler();
public StreamDefinition getStreamDefinition();
public boolean isConnected();
}Abstract class for custom functions that can be used in Siddhi queries for data transformation and computation.
public abstract class FunctionExecutor implements ExpressionExecutor {
// Abstract methods that must be implemented
public abstract void init(ExpressionExecutor[] attributeExpressionExecutors,
ConfigReader configReader, SiddhiAppContext siddhiAppContext);
public abstract Object execute(Object[] data);
public abstract Object execute(Object data);
// Concrete methods available from ExpressionExecutor
public void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,
SiddhiAppContext siddhiAppContext, String queryName,
ConfigReader configReader);
public Object execute(ComplexEvent event);
public ExpressionExecutor cloneExecutor(String key);
public String getElementId();
public void clean();
}Interface for custom stream processing that can transform or filter events in the processing pipeline.
public abstract class StreamProcessor {
// Initialization
public abstract void init(MetaStreamEvent metaStreamEvent,
AbstractDefinition inputDefinition,
ExpressionExecutor[] attributeExpressionExecutors,
ConfigReader configReader,
StreamEventClonerHolder streamEventClonerHolder,
boolean outputExpectsExpiredEvents,
boolean findToBeExecuted,
SiddhiAppContext siddhiAppContext);
// Processing
public abstract void process(ComplexEventChunk<StreamEvent> streamEventChunk,
Processor nextProcessor,
StreamEventCloner streamEventCloner,
ComplexEventPopulater complexEventPopulater);
// Lifecycle
public abstract void start();
public abstract void stop();
// Configuration
public abstract List<Attribute> getReturnAttributes();
}public class SiddhiManager {
// Extension Management
public void setExtension(String name, Class clazz);
public Map<String, Class> getExtensions();
public void removeExtension(String name);
}// Register custom extensions
SiddhiManager siddhiManager = new SiddhiManager();
// Register custom function
siddhiManager.setExtension("math:factorial", FactorialFunctionExecutor.class);
// Register custom source
siddhiManager.setExtension("kafka", KafkaSource.class);
// Register custom sink
siddhiManager.setExtension("elasticsearch", ElasticsearchSink.class);
// Register custom stream processor
siddhiManager.setExtension("ml:predict", MLPredictionProcessor.class);
// Use extensions in Siddhi app
String siddhiApp =
"@source(type='kafka', topic='stock-data', bootstrap.servers='localhost:9092', " +
" @map(type='json')) " +
"define stream StockStream (symbol string, price double, volume long); " +
"@sink(type='elasticsearch', hostname='localhost', port='9200', " +
" index.name='stock-analysis', @map(type='json')) " +
"define stream ProcessedStream (symbol string, processedPrice double, prediction string); " +
"from StockStream " +
"select symbol, " +
" math:factorial(volume % 10) as processedPrice, " +
" ml:predict(price, volume) as prediction " +
"insert into ProcessedStream;";Built-in in-memory event source for testing and development.
public class InMemorySource implements Source {
// Built-in source for in-memory event generation
// Useful for testing and development scenarios
}Built-in in-memory event sink for collecting results during testing.
public class InMemorySink implements Sink {
// Built-in sink for in-memory event collection
// Useful for testing and result collection
}Built-in logging sink for debugging and monitoring.
public class LogSink implements Sink {
// Built-in sink for logging events
// Useful for debugging and monitoring
}// Custom mathematical function
public class FactorialFunctionExecutor extends FunctionExecutor {
@Override
public void init(AttributeExpressionExecutor[] attributeExpressionExecutors,
ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
if (attributeExpressionExecutors.length != 1) {
throw new SiddhiAppValidationException("Factorial function requires exactly one parameter");
}
if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT &&
attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
throw new SiddhiAppValidationException("Factorial function requires integer input");
}
}
@Override
public Object execute(Object[] data) {
if (data[0] == null) {
return null;
}
int n = ((Number) data[0]).intValue();
if (n < 0) {
throw new SiddhiAppRuntimeException("Factorial not defined for negative numbers");
}
long result = 1;
for (int i = 2; i <= n; i++) {
result *= i;
}
return result;
}
@Override
public Attribute.Type getReturnType() {
return Attribute.Type.LONG;
}
}// Custom HTTP source
public class HttpSource implements Source {
private SourceEventListener sourceEventListener;
private String url;
private int pollInterval;
private HttpClient httpClient;
private ScheduledExecutorService scheduler;
@Override
public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,
String[] requestedTransportPropertyNames, ConfigReader configReader,
SiddhiAppContext siddhiAppContext) {
this.sourceEventListener = sourceEventListener;
this.url = optionHolder.validateAndGetStaticValue("url");
this.pollInterval = Integer.parseInt(optionHolder.validateAndGetStaticValue("poll.interval", "5000"));
this.httpClient = HttpClient.newHttpClient();
this.scheduler = Executors.newScheduledThreadPool(1);
}
@Override
public void connect(ConnectionCallback connectionCallback, State state) {
scheduler.scheduleAtFixedRate(() -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
// Parse response and send to Siddhi
Object[] eventData = parseResponse(response.body());
sourceEventListener.onEvent(eventData, null);
}
} catch (Exception e) {
connectionCallback.onError(e);
}
}, 0, pollInterval, TimeUnit.MILLISECONDS);
connectionCallback.onConnect();
}
@Override
public void disconnect() {
if (scheduler != null) {
scheduler.shutdown();
}
}
@Override
public void destroy() {
disconnect();
}
@Override
public void pause() {
// Implementation for pausing
}
@Override
public void resume() {
// Implementation for resuming
}
@Override
public Class[] getSupportedInputEventClasses() {
return new Class[]{Map.class, Object[].class};
}
@Override
public String[] getSupportedDynamicOptions() {
return new String[]{"url"};
}
private Object[] parseResponse(String responseBody) {
// Parse HTTP response into event data
// Implementation depends on response format
return new Object[]{responseBody, System.currentTimeMillis()};
}
}// Custom database sink
public class DatabaseSink implements Sink {
private DataSource dataSource;
private String tableName;
private String[] columnNames;
@Override
public void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,
ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
String dataSourceName = optionHolder.validateAndGetStaticValue("datasource");
this.dataSource = siddhiAppContext.getSiddhiDataSource(dataSourceName);
this.tableName = optionHolder.validateAndGetStaticValue("table.name");
// Extract column names from stream definition
List<Attribute> attributes = outputStreamDefinition.getAttributeList();
this.columnNames = attributes.stream()
.map(Attribute::getName)
.toArray(String[]::new);
}
@Override
public void connect() {
// Verify database connection
try (Connection conn = dataSource.getConnection()) {
// Test connection
} catch (SQLException e) {
throw new ConnectionUnavailableException("Database connection failed", e);
}
}
@Override
public void publish(Object payload, DynamicOptions dynamicOptions, State state) {
Object[] eventData = (Object[]) payload;
StringBuilder sql = new StringBuilder("INSERT INTO ");
sql.append(tableName).append(" (");
sql.append(String.join(", ", columnNames));
sql.append(") VALUES (");
sql.append(String.join(", ", Collections.nCopies(columnNames.length, "?")));
sql.append(")");
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql.toString())) {
for (int i = 0; i < eventData.length; i++) {
stmt.setObject(i + 1, eventData[i]);
}
stmt.executeUpdate();
} catch (SQLException e) {
throw new ConnectionUnavailableException("Database insert failed", e);
}
}
@Override
public void disconnect() {
// Cleanup resources
}
@Override
public void destroy() {
disconnect();
}
@Override
public Class[] getSupportedInputEventClasses() {
return new Class[]{Object[].class, Map.class};
}
@Override
public String[] getSupportedDynamicOptions() {
return new String[]{"table.name"};
}
}Interface for custom source data mapping to convert external data formats to Siddhi events.
public interface SourceMapper {
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,
Object eventObject, SourceEventListener sourceEventListener);
Class[] getSupportedInputEventClasses();
}Interface for custom sink data mapping to convert Siddhi events to external data formats.
public interface SinkMapper {
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
void mapAndSend(Event[] events, OptionHolder optionHolder,
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
SinkListener sinkListener);
Class[] getSupportedInputEventClasses();
}Extension holders manage different types of extensions and provide common functionality.
public abstract class AbstractExtensionHolder {
// Common functionality for extension management
protected Map<String, Class<?>> extensions;
public void addExtension(String name, Class<?> extensionClass);
public Class<?> getExtension(String name);
public void removeExtension(String name);
}
public class FunctionExecutorExtensionHolder extends AbstractExtensionHolder {
// Manages function executor extensions
}
public class SourceExtensionHolder extends AbstractExtensionHolder {
// Manages source extensions
}
public class SinkExtensionHolder extends AbstractExtensionHolder {
// Manages sink extensions
}public interface SourceEventListener {
void onEvent(Object eventObject, Object[] transportProperties);
void onEvent(Object eventObject, Object[] transportProperties, String[] transportSyncProperties);
}
public interface ConnectionCallback {
void onConnect();
void onError(Exception e);
}
public interface SinkListener {
void publish(Object payload);
}
public interface OptionHolder {
String validateAndGetStaticValue(String key);
String validateAndGetStaticValue(String key, String defaultValue);
String getOrCreateOption(String key, String defaultValue);
}
public interface ConfigReader {
String readConfig(String key, String defaultValue);
Map<String, String> getAllConfigs();
}
public interface DynamicOptions {
String get(String key);
}
public interface State {
boolean canDestroy();
Map<String, Object> getState();
void restoreState(Map<String, Object> state);
}
public interface AttributeMapping {
String getName();
String getMapping();
}
public interface TemplateBuilder {
String build(Event event);
}
public interface ExpressionExecutor {
void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,
SiddhiAppContext siddhiAppContext, String queryName,
ConfigReader configReader);
Object execute(ComplexEvent event);
ExpressionExecutor cloneExecutor(String key);
String getElementId();
void clean();
Attribute.Type getReturnType();
}
public interface Snapshotable {
Map<String, Object> currentState();
void restoreState(Map<String, Object> state);
}
public interface SinkHandler {
// Handler for sink operations
}
public interface SinkMapper {
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
void mapAndSend(Event[] events, OptionHolder optionHolder,
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
SinkListener sinkListener);
Class[] getSupportedInputEventClasses();
}
public interface SourceMapper {
void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
Map<String, TemplateBuilder> payloadTemplateBuilderMap,
ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,
Object eventObject, SourceEventListener sourceEventListener);
Class[] getSupportedInputEventClasses();
}Install with Tessl CLI
npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core