A modern, enterprise-ready business intelligence web application
—
Connector framework supporting SQL databases through SQLAlchemy and Druid through native APIs. Provides unified interface for datasource registration, metadata discovery, query execution, and data exploration across diverse data sources.
Central registry system for managing and accessing different datasource types with unified interface.
class ConnectorRegistry:
"""
Central registry for datasource types and instances.
Manages registration and discovery of available connector implementations.
"""
def register_sources(self, datasource_config):
"""
Register datasource classes in the registry.
Parameters:
- datasource_config: dict, mapping of datasource types to implementation classes
Usage:
Typically called during application initialization to register
SQLAlchemy tables, Druid datasources, and custom connectors.
"""
def get_datasource(self, datasource_type, datasource_id, session):
"""
Get datasource instance by type and identifier.
Parameters:
- datasource_type: str, type identifier ('table', 'druid', etc.)
- datasource_id: int, datasource unique identifier
- session: SQLAlchemy session for database operations
Returns:
Datasource instance (SqlaTable, DruidDatasource, or custom type)
Raises:
DatasourceNotFound if datasource doesn't exist
"""
def get_all_datasources(self, session):
"""
Get all available datasource instances across all types.
Parameters:
- session: SQLAlchemy session for database operations
Returns:
List of all registered datasource instances
"""
def get_datasource_by_name(self, session, datasource_type, datasource_name, schema, database_name):
"""
Find datasource by name and context.
Parameters:
- session: SQLAlchemy session
- datasource_type: str, datasource type identifier
- datasource_name: str, datasource name
- schema: str, schema context (for SQL databases)
- database_name: str, database context
Returns:
Matching datasource instance or None if not found
"""
def query_datasources_by_permissions(self, session, database, permissions):
"""
Filter datasources by user permissions.
Parameters:
- session: SQLAlchemy session
- database: Database instance for context
- permissions: set, user permission strings
Returns:
List of accessible datasource instances
"""
def get_eager_datasource(self, session, datasource_type, datasource_id):
"""
Get datasource with eagerly loaded relationships.
Parameters:
- session: SQLAlchemy session
- datasource_type: str, datasource type
- datasource_id: int, datasource identifier
Returns:
Datasource instance with loaded columns, metrics, and relationships
"""
def query_datasources_by_name(self, session, database, datasource_name, schema):
"""
Query datasources by name pattern.
Parameters:
- session: SQLAlchemy session
- database: Database instance
- datasource_name: str, name pattern for matching
- schema: str, schema context
Returns:
Query object for further filtering and execution
"""SQL database connector supporting traditional relational databases through SQLAlchemy ORM.
class SqlaTable:
"""
SQL table/view datasource with comprehensive metadata management.
Key Fields:
- table_name: str, name of database table or view
- main_dttm_col: str, primary datetime column for time-series operations
- default_endpoint: str, default API endpoint for data access
- database_id: int, foreign key to Database connection
- fetch_values_predicate: str, SQL WHERE clause for value fetching
- is_sqllab_view: bool, indicates if created from SQL Lab
- template_params: str, JSON-encoded Jinja template parameters
Relationships:
- columns: TableColumn[], table column definitions (one-to-many)
- metrics: SqlMetric[], calculated metric definitions (one-to-many)
- database: Database, database connection instance (many-to-one)
"""
def query(self):
"""
Execute queries against this datasource.
Core method for data retrieval with filtering, grouping, and aggregation.
Returns:
Query result object with data, metadata, and performance information
"""
def get_sqla_table(self):
"""
Get SQLAlchemy Table object.
Returns:
SQLAlchemy Table instance with column definitions and constraints
"""
def fetch_metadata(self):
"""
Update column metadata from database schema.
Discovers column names, types, and constraints from database catalog.
Side Effects:
Creates or updates TableColumn instances for all table columns
"""
def values_for_column(self):
"""
Get distinct column values for filter dropdowns.
Returns:
List of distinct values from specified column,
limited and filtered according to datasource configuration
"""
class TableColumn:
"""
Individual table column definition and metadata.
Key Fields:
- column_name: str, database column name
- type: str, SQLAlchemy data type string
- groupby: bool, available for grouping operations
- filterable: bool, available for filtering operations
- description: str, human-readable column description
- is_dttm: bool, indicates datetime/timestamp column
- python_date_format: str, Python strftime format for datetime parsing
- database_expression: str, custom SQL expression for computed columns
"""
class SqlMetric:
"""
Calculated metric definition using SQL expressions.
Key Fields:
- metric_name: str, display name for metric
- metric_type: str, aggregation type identifier
- expression: str, SQL expression for metric calculation
- description: str, metric description and documentation
- d3format: str, D3.js format string for number display
"""Native Druid connector for real-time analytics and OLAP operations.
class DruidDatasource:
"""
Druid datasource with native query interface.
Key Fields:
- datasource_name: str, name of Druid datasource
- cluster_name: str, Druid cluster identifier
- description: str, datasource description
- default_endpoint: str, default API endpoint
- fetch_values_from: str, method for fetching filter values
Relationships:
- columns: DruidColumn[], dimension definitions (one-to-many)
- metrics: DruidMetric[], metric aggregation definitions (one-to-many)
- cluster: DruidCluster, cluster connection configuration (many-to-one)
"""
class DruidCluster:
"""
Druid cluster connection configuration and management.
Key Fields:
- cluster_name: str, unique cluster identifier
- coordinator_host: str, Druid coordinator hostname
- coordinator_port: int, coordinator HTTP port
- coordinator_endpoint: str, coordinator API endpoint path
- broker_host: str, Druid broker hostname
- broker_port: int, broker HTTP port
- broker_endpoint: str, broker query endpoint path
- cache_timeout: int, default cache duration for queries
- verbose_name: str, human-readable cluster name
"""
class DruidColumn:
"""
Druid dimension column definition.
Key Fields:
- column_name: str, dimension name in Druid schema
- type: str, Druid dimension type (string, long, float, etc.)
- groupby: bool, available for grouping in queries
- filterable: bool, available for filtering operations
- description: str, dimension description
"""
class DruidMetric:
"""
Druid aggregation metric definition.
Key Fields:
- metric_name: str, metric display name
- metric_type: str, Druid aggregation type
- json: str, complete Druid aggregation JSON specification
- description: str, metric description and usage notes
- d3format: str, number formatting specification
"""Engine-specific configurations for different database systems.
class BaseEngineSpec:
"""
Abstract base class for database engine specifications.
Key Properties:
- engine: str, SQLAlchemy engine identifier
- time_grain_functions: dict, time grouping function mappings
- time_groupby_inline: bool, inline time grouping support
- limit_method: enum, result limiting strategy
- time_secondary_columns: bool, secondary time column support
- inner_joins: bool, inner join capability flag
- allows_subquery: bool, subquery support indicator
- force_column_alias_quotes: bool, quoted alias requirement
- arraysize: int, default database cursor array size
"""
# Supported Database Engines
class PostgresEngineSpec(BaseEngineSpec):
"""PostgreSQL database engine specification."""
class MySQLEngineSpec(BaseEngineSpec):
"""MySQL/MariaDB database engine specification."""
class RedshiftEngineSpec(BaseEngineSpec):
"""Amazon Redshift data warehouse specification."""
class SnowflakeEngineSpec(BaseEngineSpec):
"""Snowflake cloud data warehouse specification."""
class BigQueryEngineSpec(BaseEngineSpec):
"""Google BigQuery specification."""
class PrestoEngineSpec(BaseEngineSpec):
"""Presto distributed SQL query engine specification."""
class HiveEngineSpec(BaseEngineSpec):
"""Apache Hive data warehouse specification."""
class DruidEngineSpec(BaseEngineSpec):
"""Apache Druid OLAP database specification."""
class ClickHouseEngineSpec(BaseEngineSpec):
"""ClickHouse columnar database specification."""
class OracleEngineSpec(BaseEngineSpec):
"""Oracle Database specification."""
class MssqlEngineSpec(BaseEngineSpec):
"""Microsoft SQL Server specification."""Standardized time grouping capabilities across database engines.
# Built-in Time Grains
TIME_GRAINS = {
'PT1S': 'Second',
'PT1M': 'Minute',
'PT5M': '5 Minutes',
'PT10M': '10 Minutes',
'PT15M': '15 Minutes',
'PT0.5H': '30 Minutes',
'PT1H': 'Hour',
'P1D': 'Day',
'P1W': 'Week',
'P1M': 'Month',
'P0.25Y': 'Quarter',
'P1Y': 'Year'
}
# Week Variations
WEEK_GRAINS = {
'1969-12-28T00:00:00Z/P1W': 'Week (Sunday Start)',
'1969-12-29T00:00:00Z/P1W': 'Week (Monday Start)',
'P1W/1970-01-03T00:00:00Z': 'Week (Saturday End)',
'P1W/1970-01-04T00:00:00Z': 'Week (Sunday End)'
}Different strategies for limiting query results based on database capabilities.
class LimitMethod:
"""Query result limiting strategies."""
FETCH_MANY = 'fetch_many'
"""Use cursor.fetchmany() for result limiting."""
WRAP_SQL = 'wrap_sql'
"""Wrap query in LIMIT clause or equivalent."""
FORCE_LIMIT = 'force_limit'
"""Always apply limit regardless of query structure."""from superset.connectors.connector_registry import ConnectorRegistry
# Register custom datasource type
ConnectorRegistry.register_sources({
'custom_type': CustomDatasourceClass
})from superset.connectors.connector_registry import ConnectorRegistry
# Get specific datasource
datasource = ConnectorRegistry.get_datasource(
datasource_type='table',
datasource_id=123,
session=db.session
)
# Get all accessible datasources
all_sources = ConnectorRegistry.get_all_datasources(db.session)# Get engine specification
engine_spec = database.db_engine_spec()
# Get available time grains
time_grains = engine_spec.time_grain_functions
# Check capabilities
supports_subqueries = engine_spec.allows_subquery
supports_joins = engine_spec.inner_joinsThe connector framework provides a flexible and extensible architecture for integrating diverse data sources while maintaining a consistent interface for data exploration and visualization.
Install with Tessl CLI
npx tessl i tessl/pypi-superset