Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
Comprehensive exception handling for various error scenarios in stream processing operations. Siddhi Core provides a hierarchy of exceptions to handle different types of errors that can occur during application creation, runtime execution, and state management.
Base exception class for all Siddhi-related exceptions providing common exception handling patterns.
public class SiddhiException extends RuntimeException {
public SiddhiException(String message);
public SiddhiException(String message, Throwable cause);
public SiddhiException(Throwable cause);
}Thrown during Siddhi app creation errors when parsing or validating Siddhi applications.
public class SiddhiAppCreationException extends SiddhiException {
public SiddhiAppCreationException(String message);
public SiddhiAppCreationException(String message, Throwable cause);
public SiddhiAppCreationException(Throwable cause);
}Runtime errors in Siddhi app execution occurring during event processing or query execution.
public class SiddhiAppRuntimeException extends SiddhiException {
public SiddhiAppRuntimeException(String message);
public SiddhiAppRuntimeException(String message, Throwable cause);
public SiddhiAppRuntimeException(Throwable cause);
}// Handle application creation errors
try {
SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(invalidSiddhiApp);
} catch (SiddhiAppCreationException e) {
System.err.println("Failed to create Siddhi app: " + e.getMessage());
// Log detailed error information
if (e.getCause() instanceof ParseException) {
System.err.println("Syntax error in Siddhi query");
} else if (e.getCause() instanceof ValidationException) {
System.err.println("Validation error in Siddhi app definition");
}
// Handle gracefully
fallbackToDefaultApp();
}
// Handle runtime errors
try {
InputHandler handler = runtime.getInputHandler("StockStream");
handler.send(malformedData);
} catch (SiddhiAppRuntimeException e) {
System.err.println("Runtime error during event processing: " + e.getMessage());
// Implement error recovery
handleRuntimeError(e, malformedData);
}Thrown when referenced definition doesn't exist in the Siddhi application.
public class DefinitionNotExistException extends SiddhiException {
public DefinitionNotExistException(String message);
public DefinitionNotExistException(String message, Throwable cause);
}Thrown when referenced query doesn't exist in the Siddhi application.
public class QueryNotExistException extends SiddhiException {
public QueryNotExistException(String message);
public QueryNotExistException(String message, Throwable cause);
}// Handle missing definitions
try {
InputHandler handler = runtime.getInputHandler("NonExistentStream");
} catch (DefinitionNotExistException e) {
System.err.println("Stream not found: " + e.getMessage());
// List available streams
Map<String, StreamDefinition> streams = runtime.getStreamDefinitionMap();
System.out.println("Available streams: " + streams.keySet());
// Use fallback stream
handler = runtime.getInputHandler("DefaultStream");
}
// Handle missing queries
try {
runtime.addCallback("NonExistentQuery", queryCallback);
} catch (QueryNotExistException e) {
System.err.println("Query not found: " + e.getMessage());
// List available queries
Set<String> queries = runtime.getQueryNames();
System.out.println("Available queries: " + queries);
// Register with existing query
runtime.addCallback(queries.iterator().next(), queryCallback);
}Errors during store query creation when parsing or validating store queries.
public class StoreQueryCreationException extends SiddhiException {
public StoreQueryCreationException(String message);
public StoreQueryCreationException(String message, Throwable cause);
}Runtime errors in store query execution during on-demand query processing.
public class StoreQueryRuntimeException extends SiddhiException {
public StoreQueryRuntimeException(String message);
public StoreQueryRuntimeException(String message, Throwable cause);
}// Handle store query creation errors
try {
String invalidQuery = "from NonExistentTable select *";
Event[] results = runtime.query(invalidQuery);
} catch (StoreQueryCreationException e) {
System.err.println("Invalid store query: " + e.getMessage());
// Validate query syntax
if (e.getMessage().contains("table")) {
System.err.println("Check table name and definition");
}
// Use corrected query
String correctedQuery = "from StockTable select *";
results = runtime.query(correctedQuery);
}
// Handle store query runtime errors
try {
String query = "from StockTable on price > ? select *"; // Missing parameter
Event[] results = runtime.query(query);
} catch (StoreQueryRuntimeException e) {
System.err.println("Store query execution failed: " + e.getMessage());
// Use parameterized query properly
String paramQuery = "from StockTable on price > 100 select *";
results = runtime.query(paramQuery);
}State restoration failures when attempting to restore Siddhi application state.
public class CannotRestoreSiddhiAppStateException extends SiddhiException {
public CannotRestoreSiddhiAppStateException(String message);
public CannotRestoreSiddhiAppStateException(String message, Throwable cause);
}State clearing failures when attempting to clear stored state.
public class CannotClearSiddhiAppStateException extends SiddhiException {
public CannotClearSiddhiAppStateException(String message);
public CannotClearSiddhiAppStateException(String message, Throwable cause);
}// Handle state restoration errors
try {
runtime.restoreLastRevision();
} catch (CannotRestoreSiddhiAppStateException e) {
System.err.println("Failed to restore state: " + e.getMessage());
// Try alternative restoration methods
try {
byte[] snapshot = loadBackupSnapshot();
runtime.restore(snapshot);
} catch (Exception backupError) {
System.err.println("Backup restoration also failed, starting fresh");
// Start with clean state
}
}
// Handle state clearing errors
try {
runtime.clearAllRevisions();
} catch (CannotClearSiddhiAppStateException e) {
System.err.println("Failed to clear state: " + e.getMessage());
// Manual cleanup
if (e.getCause() instanceof IOException) {
System.err.println("File system issue, check permissions");
} else if (e.getCause() instanceof SQLException) {
System.err.println("Database issue, check connection");
}
}Persistence store related errors during state save/load operations.
public class PersistenceStoreException extends SiddhiException {
public PersistenceStoreException(String message);
public PersistenceStoreException(String message, Throwable cause);
}Thrown when persistence store is not configured but persistence operations are attempted.
public class NoPersistenceStoreException extends SiddhiException {
public NoPersistenceStoreException(String message);
}// Handle persistence store errors
try {
PersistenceReference ref = runtime.persist();
} catch (PersistenceStoreException e) {
System.err.println("Persistence failed: " + e.getMessage());
// Implement fallback persistence
try {
byte[] snapshot = runtime.snapshot();
saveSnapshotToAlternateStore(snapshot);
} catch (Exception fallbackError) {
System.err.println("Fallback persistence also failed");
// Log for manual recovery
logStateForManualRecovery();
}
}
// Handle missing persistence store
try {
siddhiManager.persist();
} catch (NoPersistenceStoreException e) {
System.err.println("No persistence store configured: " + e.getMessage());
// Configure default persistence store
FilePersistenceStore defaultStore = new FilePersistenceStore("./default-state");
siddhiManager.setPersistenceStore(defaultStore);
// Retry persistence
siddhiManager.persist();
}Data purging operation errors during incremental data cleanup.
public class DataPurgingException extends SiddhiException {
public DataPurgingException(String message);
public DataPurgingException(String message, Throwable cause);
}Invalid attribute reference errors when accessing non-existent attributes.
public class NoSuchAttributeException extends SiddhiException {
public NoSuchAttributeException(String message);
public NoSuchAttributeException(String message, Throwable cause);
}// Handle data purging errors
try {
runtime.setPurgingEnabled(true);
// Purging happens automatically
} catch (DataPurgingException e) {
System.err.println("Data purging failed: " + e.getMessage());
// Implement manual cleanup
if (e.getCause() instanceof SQLException) {
// Database purging failed
executeDatabaseCleanup();
} else {
// File system purging failed
executeFileSystemCleanup();
}
}
// Handle attribute access errors
try {
String query = "from StockTable select nonExistentColumn";
Event[] results = runtime.query(query);
} catch (NoSuchAttributeException e) {
System.err.println("Attribute not found: " + e.getMessage());
// Get available attributes
Attribute[] attributes = runtime.getStoreQueryOutputAttributes("from StockTable select *");
System.out.println("Available attributes:");
for (Attribute attr : attributes) {
System.out.println("- " + attr.getName() + " (" + attr.getType() + ")");
}
// Use corrected query
String correctedQuery = "from StockTable select symbol, price";
results = runtime.query(correctedQuery);
}Connection unavailability errors when external systems are not accessible.
public class ConnectionUnavailableException extends SiddhiException {
public ConnectionUnavailableException(String message);
public ConnectionUnavailableException(String message, Throwable cause);
}Unsupported operations when attempting operations not available in current context.
public class OperationNotSupportedException extends SiddhiException {
public OperationNotSupportedException(String message);
public OperationNotSupportedException(String message, Throwable cause);
}// Handle connection errors
try {
// Sink attempting to connect to external system
runtime.start(); // May fail if external connections are down
} catch (ConnectionUnavailableException e) {
System.err.println("External system unavailable: " + e.getMessage());
// Implement retry logic
RetryPolicy retryPolicy = new RetryPolicy()
.withMaxRetries(3)
.withDelay(Duration.ofSeconds(5));
retryPolicy.execute(() -> {
runtime.start();
return null;
});
}
// Handle unsupported operations
try {
// Some operations may not be supported in certain contexts
runtime.someUnsupportedOperation();
} catch (OperationNotSupportedException e) {
System.err.println("Operation not supported: " + e.getMessage());
// Use alternative approach
useAlternativeMethod();
}Class loading failures when loading extension classes or dependencies.
public class CannotLoadClassException extends SiddhiException {
public CannotLoadClassException(String message);
public CannotLoadClassException(String message, Throwable cause);
}Configuration loading failures when reading configuration files or settings.
public class CannotLoadConfigurationException extends SiddhiException {
public CannotLoadConfigurationException(String message);
public CannotLoadConfigurationException(String message, Throwable cause);
}Queryable record table errors during table query operations.
public class QueryableRecordTableException extends SiddhiException {
public QueryableRecordTableException(String message);
public QueryableRecordTableException(String message, Throwable cause);
}// Global exception handler for Siddhi applications
public class SiddhiExceptionHandler {
public void handleException(Exception e, String context) {
if (e instanceof SiddhiAppCreationException) {
handleCreationError((SiddhiAppCreationException) e, context);
} else if (e instanceof SiddhiAppRuntimeException) {
handleRuntimeError((SiddhiAppRuntimeException) e, context);
} else if (e instanceof PersistenceStoreException) {
handlePersistenceError((PersistenceStoreException) e, context);
} else if (e instanceof ConnectionUnavailableException) {
handleConnectionError((ConnectionUnavailableException) e, context);
} else {
handleGenericError(e, context);
}
}
private void handleCreationError(SiddhiAppCreationException e, String context) {
// Log creation error with context
logger.error("Siddhi app creation failed in context: {}", context, e);
// Notify administrators
alertService.sendAlert("Siddhi app creation failed", e.getMessage());
// Attempt fallback configuration
tryFallbackConfiguration(context);
}
private void handleRuntimeError(SiddhiAppRuntimeException e, String context) {
// Log runtime error
logger.error("Siddhi runtime error in context: {}", context, e);
// Check if recovery is possible
if (isRecoverable(e)) {
attemptRecovery(context);
} else {
initiateFailover(context);
}
}
private boolean isRecoverable(SiddhiAppRuntimeException e) {
// Determine if error is recoverable based on cause
Throwable cause = e.getCause();
return !(cause instanceof OutOfMemoryError) &&
!(cause instanceof StackOverflowError);
}
}// Resilient operation execution with retry and circuit breaker
public class ResilientSiddhiOperations {
private final CircuitBreaker circuitBreaker;
private final RetryPolicy retryPolicy;
public void executeWithResilience(Runnable operation) {
try {
circuitBreaker.executeSupplier(() -> {
retryPolicy.execute(() -> {
operation.run();
return null;
});
return null;
});
} catch (SiddhiException e) {
// Handle specific Siddhi exceptions
handleSiddhiException(e);
} catch (Exception e) {
// Handle other exceptions
handleGenericException(e);
}
}
private void handleSiddhiException(SiddhiException e) {
if (e instanceof ConnectionUnavailableException) {
// Switch to offline mode
switchToOfflineMode();
} else if (e instanceof PersistenceStoreException) {
// Use alternative storage
switchToAlternativeStorage();
}
}
}public interface ExceptionHandler<T> {
void handle(T object, Exception exception);
}
public interface ExceptionListener {
void exceptionThrown(Exception exception);
}
public interface RetryPolicy {
<T> T execute(Callable<T> callable) throws Exception;
RetryPolicy withMaxRetries(int maxRetries);
RetryPolicy withDelay(Duration delay);
}
public interface CircuitBreaker {
<T> T executeSupplier(Supplier<T> supplier) throws Exception;
void open();
void close();
boolean isOpen();
}Install with Tessl CLI
npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core