CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-vdk-data-sources

Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.

Overview
Eval results
Files

data-sources.mddocs/

Data Sources

Core interfaces and utilities for creating custom data sources that integrate with the VDK data ingestion framework. Data sources manage connections to external systems and expose data through streams.

Capabilities

Data Source Interface

Abstract interface that all data sources must implement to provide unified data access patterns.

class IDataSource:
    """
    Abstract class for a Data Source, responsible for managing the connection and providing data streams.
    """
    def configure(self, config: IDataSourceConfiguration):
        """
        Configure the data source.
        
        Args:
            config: Data source configuration object
        """
    
    def connect(self, state: Optional[IDataSourceState]):
        """
        Establish a connection using provided configuration and last saved state.
        
        Args:
            state: Data source state object for resuming from previous runs
        """
    
    def disconnect(self):
        """
        Disconnect and clean up resources if needed.
        """
    
    def streams(self) -> List[IDataSourceStream]:
        """
        Get the available streams for this data source.
        
        Returns:
            List of IDataSourceStream objects
        """

Data Source Stream Interface

Abstract interface for individual data streams within a data source, representing channels through which data flows.

class IDataSourceStream:
    """
    Abstract class for a Data Source Stream, representing a channel or resource to read data.
    """
    def name(self) -> str:
        """
        Returns:
            Unique name of the data source stream within the data source
        """
    
    def read(self) -> Iterable[DataSourcePayload]:
        """
        Generator method or Iterator for reading data from the stream.
        
        Returns:
            An iterable of DataSourcePayload objects
        """

Data Source Registration

Decorator and factory system for registering data sources with the VDK plugin system.

def data_source(name: str, config_class: Type[IDataSourceConfiguration]):
    """
    Decorator to mark a class as a data source implementation.
    
    Args:
        name: The name identifier for the data source
        config_class: The configuration class that implements IDataSourceConfiguration
    
    Returns:
        Decorated data source class
    """

class SingletonDataSourceFactory(IDataSourceFactory):
    """
    Singleton factory class to create and manage Data Source instances.
    """
    def register_data_source_class(self, data_source_class: Type[IDataSource]):
        """
        Register a data source and its associated configuration class.
        
        Args:
            data_source_class: The data source class decorated with @data_source
        """
    
    def create_data_source(self, name: str) -> IDataSource:
        """
        Create an instance of a registered data source.
        
        Args:
            name: The name identifier for the data source
            
        Returns:
            An instance of the data source
        """
    
    def create_configuration(self, name: str, config_data: Dict[str, Any]):
        """
        Create a configuration instance for a data source.
        
        Args:
            name: The name identifier for the data source
            config_data: Dictionary of configuration values
            
        Returns:
            Configuration instance
        """

Data Source Payloads

Data structure that encapsulates data, metadata, and state information for each piece of data ingested.

@dataclass(frozen=True)
class DataSourcePayload:
    """
    Encapsulates a single payload to be ingested coming from a data source.
    """
    data: Optional[Dict[str, Any]]  # The data to be ingested
    metadata: Optional[Dict[str, Union[int, str, bool, float, datetime]]]  # Additional metadata
    state: Optional[Dict[str, Any]] = field(default_factory=dict)  # State for incremental ingestion
    destination_table: Optional[str] = None  # Optional destination table override

Error Handling

Exception types and callback interfaces for handling data source errors during ingestion.

class StopDataSourceStream(Exception):
    """Signal the end of a stream and there's no more data"""

class RetryDataSourceStream(Exception):
    """Signal the stream ingestion should be retried"""

class DataSourcesAggregatedException(Exception):
    """
    Exception to aggregate multiple Data Sources failures.
    """
    def __init__(self, data_streams_exceptions: Dict[str, Exception]): ...

@dataclass
class DataSourceError:
    """
    Data class to encapsulate information about a Data Source ingestion error.
    """
    data_stream: IDataSourceStream
    exception: BaseException

class IDataSourceErrorCallback:
    """
    Callback interface to be implemented for handling data source ingestion errors.
    """
    def __call__(self, error: DataSourceError):
        """
        Invoked when an error occurs during data ingestion.
        
        Args:
            error: Object containing details of the occurred error
            
        Raises:
            StopDataSourceStream: Stops the current data stream without errors
            RetryDataSourceStream: Retries ingesting the current data stream later
        """

Data Metrics Analysis

Interfaces for implementing metrics analysis on data being ingested from data sources.

class IDataMetricsBatchAnalyzer:
    """
    Implement metrics analyzer for given data source.
    This will be called for each payload.
    The implementation must provide constructor without arguments.
    """
    def analyze_batch(self, payload: DataSourcePayload):
        """
        Analyze the payload as it's being collected.
        
        Args:
            payload: DataSourcePayload being processed
        """

class IDataMetricsFullAnalyzer:
    """
    Implement metrics analyzer for given data source.
    This will be called after the data source is exhausted.
    The implementation must provide constructor without arguments.
    """
    def get_data_store_target(self) -> str:
        """
        Returns the ingestion target where the data would be stored for further analysis.
        
        Returns:
            Target identifier for data storage
        """
    
    def get_data_store_method(self) -> str:
        """
        Returns the ingestion method which will be used to store the data for further analysis.
        
        Returns:
            Method identifier for data storage
        """
    
    def analyze_at_the_end(self):
        """
        Does the actual analysis on the stored data.
        To use this you must also specify data store method and data store target.
        """

CLI Utilities

Utility functions for listing and exploring registered data sources through the command-line interface.

def list_data_sources() -> List[Dict]:
    """
    List all registered data sources.
    
    Returns:
        List of dictionaries containing data source information
    """

def list_config_options(data_source_name: str) -> List[Dict]:
    """
    List configuration options for a specific data source.
    
    Args:
        data_source_name: Name of the data source
        
    Returns:
        List of dictionaries containing configuration options
    """

@click.command(name="data-sources")
def data_sources(ctx: click.Context, list: bool, config: str, output: str):
    """
    CLI command for exploring VDK data sources.
    """

Hook Registration

Hook specification for registering data sources with the VDK plugin system.

class DataSourcesHookSpec:
    """Hook specifications for data source registration"""
    
    @hookspec(historic=True)
    def vdk_data_sources_register(self, data_source_factory: IDataSourceFactory):
        """
        Register a data source and its associated configuration class.
        
        Args:
            data_source_factory: The factory where the data sources should be registered
        """

Built-in Data Sources

Pre-built data source implementations included with the plugin.

@config_class(name="auto_generated", description="Configuration for Auto generated data source")
class AutoGeneratedDataSourceConfiguration(IDataSourceConfiguration):
    num_records: int = config_field(description="Number of records to return", default=2)
    include_metadata: bool = config_field(description="Include autogenerated metadata", default=False)
    num_streams: int = config_field(description="Number of streams the data source would have", default=1)

@data_source(name="auto-generated-data", config_class=AutoGeneratedDataSourceConfiguration)
class AutoGeneratedDataSource(IDataSource):
    """Data source that generates dummy data for testing purposes."""

Usage Examples

Creating a Custom Data Source

from vdk.plugin.data_sources.data_source import IDataSource, IDataSourceConfiguration, IDataSourceStream
from vdk.plugin.data_sources.factory import data_source
from vdk.plugin.data_sources.config import config_class, config_field

# Define configuration
@config_class(name="csv-file", description="Configuration for CSV file data source")
class CsvDataSourceConfiguration(IDataSourceConfiguration):
    file_path: str = config_field(description="Path to CSV file")
    delimiter: str = config_field(description="CSV delimiter", default=",")
    has_header: bool = config_field(description="CSV has header row", default=True)

# Implement data source stream
class CsvDataSourceStream(IDataSourceStream):
    def __init__(self, file_path: str, config: CsvDataSourceConfiguration):
        self._file_path = file_path
        self._config = config
    
    def name(self) -> str:
        return f"csv_file_{os.path.basename(self._file_path)}"
    
    def read(self) -> Iterable[DataSourcePayload]:
        with open(self._file_path, 'r') as file:
            reader = csv.DictReader(file, delimiter=self._config.delimiter)
            for row_num, row in enumerate(reader):
                yield DataSourcePayload(
                    data=dict(row),
                    metadata={"row_number": row_num, "file_path": self._file_path},
                    state={"last_row": row_num}
                )

# Implement data source
@data_source(name="csv-file", config_class=CsvDataSourceConfiguration)
class CsvDataSource(IDataSource):
    def __init__(self):
        self._config = None
        self._streams = []
    
    def configure(self, config: CsvDataSourceConfiguration):
        self._config = config
    
    def connect(self, state: Optional[IDataSourceState]):
        self._streams = [CsvDataSourceStream(self._config.file_path, self._config)]
    
    def disconnect(self):
        self._streams = []
    
    def streams(self) -> List[IDataSourceStream]:
        return self._streams

Registering a Data Source

from vdk.api.plugin.hook_markers import hookimpl
from vdk.plugin.data_sources.factory import IDataSourceFactory

class MyDataSourcePlugin:
    @hookimpl
    def vdk_data_sources_register(self, data_source_factory: IDataSourceFactory):
        data_source_factory.register_data_source_class(CsvDataSource)

Install with Tessl CLI

npx tessl i tessl/pypi-vdk-data-sources

docs

configuration.md

data-flow.md

data-sources.md

index.md

state-management.md

tile.json