Extensible data source implementations for Sentinel flow control and circuit breaker library.
—
Abstract base classes providing common functionality for data source implementations, including automatic refresh capabilities and configuration parsing patterns.
The AbstractDataSource class provides basic functionality for loading and parsing configurations, implementing the common patterns shared by most readable data sources.
/**
* The abstract readable data source provides basic functionality for loading and parsing config.
* @param <S> source data type
* @param <T> target data type
*/
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> {
protected final Converter<S, T> parser;
protected final SentinelProperty<T> property;
/**
* Constructor with converter/parser.
* @param parser the converter to transform source data to target type
* @throws IllegalArgumentException if parser is null
*/
public AbstractDataSource(Converter<S, T> parser);
/**
* Load config from readSource().
* @return the target data
* @throws Exception IO or other error occurs
*/
public T loadConfig() throws Exception;
/**
* Load config from provided source data.
* @param conf the source data
* @return the target data converted from source
* @throws Exception conversion error occurs
*/
public T loadConfig(S conf) throws Exception;
/**
* Get SentinelProperty of the data source.
* @return the property for dynamic updates
*/
public SentinelProperty<T> getProperty();
}Usage Examples:
// Custom implementation extending AbstractDataSource
public class HttpReadableDataSource<T> extends AbstractDataSource<String, T> {
private final String url;
private final Map<String, String> headers;
public HttpReadableDataSource(String url, Converter<String, T> parser) {
super(parser);
this.url = url;
this.headers = new HashMap<>();
}
@Override
public String readSource() throws Exception {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
// Add headers
for (Map.Entry<String, String> header : headers.entrySet()) {
conn.setRequestProperty(header.getKey(), header.getValue());
}
// Read response
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(conn.getInputStream()))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
return response.toString();
}
}
@Override
public void close() throws Exception {
// Cleanup HTTP connections if needed
}
public void addHeader(String name, String value) {
headers.put(name, value);
}
}
// Usage
Converter<String, List<FlowRule>> converter = source ->
JSON.parseArray(source, FlowRule.class);
HttpReadableDataSource<List<FlowRule>> httpDs =
new HttpReadableDataSource<>("http://config-server/flow-rules", converter);
httpDs.addHeader("Authorization", "Bearer token123");
List<FlowRule> rules = httpDs.loadConfig();The AutoRefreshDataSource class extends AbstractDataSource to provide automatic refresh capabilities with configurable intervals.
/**
* A ReadableDataSource automatically fetches the backend data.
* @param <S> source data type
* @param <T> target data type
*/
public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, T> {
protected long recommendRefreshMs;
/**
* Constructor with default refresh interval (3000ms).
* @param configParser the converter to transform source data
*/
public AutoRefreshDataSource(Converter<S, T> configParser);
/**
* Constructor with custom refresh interval.
* @param configParser the converter to transform source data
* @param recommendRefreshMs refresh interval in milliseconds (must be > 0)
* @throws IllegalArgumentException if recommendRefreshMs <= 0
*/
public AutoRefreshDataSource(Converter<S, T> configParser, long recommendRefreshMs);
/**
* Close the data source and shutdown the refresh scheduler.
* @throws Exception if cleanup fails
*/
public void close() throws Exception;
/**
* Check if the source has been modified since last read.
* Override this method to implement custom modification detection.
* @return true if source is modified, false otherwise
*/
protected boolean isModified();
}Usage Examples:
// Custom auto-refresh implementation
public class DatabaseAutoRefreshDataSource<T> extends AutoRefreshDataSource<ResultSet, T> {
private final String connectionUrl;
private final String query;
private long lastChecksum = 0;
public DatabaseAutoRefreshDataSource(
String connectionUrl,
String query,
Converter<ResultSet, T> converter) {
super(converter, 5000); // Refresh every 5 seconds
this.connectionUrl = connectionUrl;
this.query = query;
}
@Override
public ResultSet readSource() throws Exception {
Connection conn = DriverManager.getConnection(connectionUrl);
PreparedStatement stmt = conn.prepareStatement(query);
return stmt.executeQuery();
}
@Override
protected boolean isModified() {
try {
// Check if database has updates using a checksum query
Connection conn = DriverManager.getConnection(connectionUrl);
PreparedStatement stmt = conn.prepareStatement(
"SELECT CHECKSUM(*) FROM configuration_table");
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
long currentChecksum = rs.getLong(1);
if (currentChecksum != lastChecksum) {
lastChecksum = currentChecksum;
return true;
}
}
return false;
} catch (Exception e) {
// On error, assume modified to trigger refresh
return true;
}
}
@Override
public void close() throws Exception {
super.close();
// Close database connections
}
}
// Usage with property listener
Converter<ResultSet, List<FlowRule>> dbConverter = rs -> {
List<FlowRule> rules = new ArrayList<>();
while (rs.next()) {
FlowRule rule = new FlowRule();
rule.setResource(rs.getString("resource"));
rule.setCount(rs.getDouble("qps"));
rules.add(rule);
}
return rules;
};
DatabaseAutoRefreshDataSource<List<FlowRule>> dbDs =
new DatabaseAutoRefreshDataSource<>(
"jdbc:mysql://localhost/sentinel",
"SELECT * FROM flow_rules WHERE enabled = 1",
dbConverter
);
// Auto-refresh will update rules every 5 seconds
dbDs.getProperty().addListener(rules -> {
FlowRuleManager.loadRules(rules);
System.out.println("Rules refreshed from database: " + rules.size());
});The AutoRefreshDataSource uses a scheduled executor service to periodically check for updates:
NamedThreadFactory for proper thread namingisModified() before each refreshBoth base classes integrate with Sentinel's property system for proper lifecycle management:
// Proper setup and cleanup
ReadableDataSource<String, List<FlowRule>> ds =
new FileRefreshableDataSource<>("/config/rules.json", converter);
// Register for updates
ds.getProperty().addListener(FlowRuleManager::loadRules);
// Always close when done
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
ds.close();
} catch (Exception e) {
System.err.println("Error closing data source: " + e.getMessage());
}
}));readSource() and isModified() methodsInstall with Tessl CLI
npx tessl i tessl/maven-com-alibaba-csp--sentinel-datasource-extension