or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-flow.mddata-sources.mdindex.mdstate-management.md
tile.json

tessl/pypi-vdk-data-sources

Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/vdk-data-sources@0.1.x

To install, run

npx @tessl/cli install tessl/pypi-vdk-data-sources@0.1.0

index.mddocs/

VDK Data Sources

Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management. This plugin simplifies data pipeline construction by offering consistent APIs for connecting to databases, REST APIs, message brokers, and other data sources.

Package Information

  • Package Name: vdk-data-sources
  • Package Type: pypi
  • Language: Python
  • Installation: pip install vdk-data-sources

Core Imports

from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping.definitions import (
    SourceDefinition, 
    DestinationDefinition, 
    DataFlowMappingDefinition
)

For implementing custom data sources:

from vdk.plugin.data_sources.data_source import (
    IDataSource, 
    IDataSourceStream, 
    IDataSourceConfiguration,
    DataSourcePayload
)
from vdk.plugin.data_sources.factory import data_source, SingletonDataSourceFactory
from vdk.plugin.data_sources.config import config_class, config_field
from vdk.plugin.data_sources.state import IDataSourceState

For TOML configuration parsing:

from vdk.plugin.data_sources.mapping import toml_parser

For CLI utilities:

from vdk.plugin.data_sources.sources_command import list_data_sources, list_config_options

Basic Usage

Simple Data Flow

from vdk.api.job_input import IJobInput
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping.definitions import (
    SourceDefinition, 
    DestinationDefinition, 
    DataFlowMappingDefinition
)

def run(job_input: IJobInput):
    # Define source and destination
    source = SourceDefinition(
        id="my-source", 
        name="auto-generated-data", 
        config={"num_records": 100}
    )
    destination = DestinationDefinition(
        id="my-dest", 
        method="memory"
    )

    # Execute data flow
    with DataFlowInput(job_input) as flow_input:
        flow_input.start(DataFlowMappingDefinition(source, destination))

TOML Configuration-Based Data Flow

from vdk.api.job_input import IJobInput
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping import toml_parser

def run(job_input: IJobInput):
    # Load configuration from TOML file
    definitions = toml_parser.load_config("config.toml")
    
    # Execute all configured flows
    with DataFlowInput(job_input) as flow_input:
        flow_input.start_flows(definitions)

Example config.toml:

[sources.auto]
name = "auto-generated-data"
config = { num_records = 50 }

[destinations.auto-dest]
method = "memory"

[[flows]]
from = "auto"
to = "auto-dest"

Architecture

The vdk-data-sources plugin follows a layered architecture with clear separation of concerns:

  • Data Sources: Manage connections and expose data streams
  • Data Streams: Abstract individual data channels within sources
  • Data Flow Engine: Orchestrates data movement from sources to destinations
  • Configuration System: Provides typed configuration management
  • State Management: Handles incremental ingestion and resume capabilities
  • Plugin Integration: Integrates with VDK's plugin system

Capabilities

Data Source Implementation

Core interfaces and utilities for creating custom data sources that integrate with the VDK data ingestion framework.

@data_source(name: str, config_class: Type[IDataSourceConfiguration])
def data_source_decorator(data_source_class: Type[IDataSource]): ...

class IDataSource:
    def configure(self, config: IDataSourceConfiguration): ...
    def connect(self, state: Optional[IDataSourceState]): ...
    def disconnect(self): ...
    def streams(self) -> List[IDataSourceStream]: ...

class IDataSourceStream:
    def name(self) -> str: ...
    def read(self) -> Iterable[DataSourcePayload]: ...

Data Sources

Data Flow Management

High-level orchestration system for managing data flows from sources to destinations with transformation support.

class DataFlowInput:
    def __init__(self, job_input: IJobInput): ...
    def start(self, flow_definition: DataFlowMappingDefinition): ...
    def start_flows(self, definitions: Definitions): ...
    def close(self): ...

class DataFlowMappingDefinition:
    from_source: SourceDefinition
    to_destination: DestinationDefinition
    map_function: Optional[Callable[[DataSourcePayload], Optional[DataSourcePayload]]]

Data Flow

Configuration System

Typed configuration management system with decorators for defining data source configuration schemas.

@config_class(name: str, description: str, **kwargs)
def config_class_decorator(cls): ...

def config_field(
    description: str,
    is_sensitive: bool = False,
    default=MISSING,
    **kwargs
): ...

Configuration

State Management

Persistent state management for data sources enabling incremental ingestion and resume capabilities.

class IDataSourceState:
    def read_stream(self, stream_name: str) -> Dict[str, Any]: ...
    def update_stream(self, stream_name: str, state: Dict[str, Any]): ...
    def read_others(self, key: str) -> Dict[str, Any]: ...
    def update_others(self, key: str, state: Dict[str, Any]): ...

State Management

Data Metrics Analysis

Interfaces for implementing metrics collection and analysis on data being ingested from data sources.

class IDataMetricsBatchAnalyzer:
    def analyze_batch(self, payload: DataSourcePayload): ...

class IDataMetricsFullAnalyzer:
    def get_data_store_target(self) -> str: ...
    def get_data_store_method(self) -> str: ...
    def analyze_at_the_end(self): ...

CLI Utilities

Functions for programmatically accessing data source registry information and exploring available data sources.

def list_data_sources() -> List[Dict]:
    """
    List all registered data sources.
    
    Returns:
        List of dictionaries containing data source information
    """

def list_config_options(data_source_name: str) -> List[Dict]:
    """
    List configuration options for a specific data source.
    
    Args:
        data_source_name: Name of the data source
        
    Returns:
        List of dictionaries containing configuration options
    """

Types

@dataclass(frozen=True)
class DataSourcePayload:
    data: Optional[Dict[str, Any]]
    metadata: Optional[Dict[str, Union[int, str, bool, float, datetime]]]
    state: Optional[Dict[str, Any]] = field(default_factory=dict)
    destination_table: Optional[str] = None

@dataclass
class SourceDefinition:
    id: str
    name: str
    config: Dict[str, Any] = field(default_factory=dict)

@dataclass
class DestinationDefinition:
    id: str
    method: str
    target: Optional[str] = None

@dataclass
class Definitions:
    sources: Dict[str, SourceDefinition]
    destinations: Dict[str, DestinationDefinition]
    flows: List[DataFlowMappingDefinition]

class StopDataSourceStream(Exception):
    """Signal the end of a stream and there's no more data"""

class RetryDataSourceStream(Exception):
    """Signal the stream ingestion should be retried"""

class DataSourcesAggregatedException(Exception):
    """Exception to aggregate multiple Data Sources failures"""
    def __init__(self, data_streams_exceptions: Dict[str, Exception]): ...

class DataSourceNotFoundException(Exception):
    """Raised when a requested data source is not found in registry"""

@dataclass
class DataSourceError:
    """Data class to encapsulate information about a Data Source ingestion error"""
    data_stream: IDataSourceStream
    exception: BaseException