CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-vdk-data-sources

Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.

Overview
Eval results
Files

data-flow.mddocs/

Data Flow

High-level orchestration system for managing data flows from sources to destinations with transformation support. The data flow system provides a declarative way to define and execute data ingestion pipelines.

Capabilities

Data Flow Input Manager

Context manager for orchestrating data flows from sources to destinations with automatic resource management.

class DataFlowInput:
    """
    Manages the data flow from source to destination using the provided job input.
    """
    def __init__(self, job_input: IJobInput):
        """
        Initialize data flow manager.
        
        Args:
            job_input: Instance of IJobInput, providing context for the current job
        """
    
    def __enter__(self):
        """Context manager entry"""
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with automatic cleanup"""
    
    def close(self):
        """
        Terminates and waits for data source ingestion to finish.
        """
    
    def start(self, flow_definition: DataFlowMappingDefinition):
        """
        Start data flow from a specific source to a specific destination.
        
        Args:
            flow_definition: The definition of the source and destination flow
        """
    
    def start_flows(self, definitions: Definitions):
        """
        Start data flows based on a list of defined mappings.
        
        Args:
            definitions: Definitions containing the data flow mappings
        """

Flow Definitions

Data structures for defining sources, destinations, and mappings between them.

@dataclass
class SourceDefinition:
    """
    SourceDefinition class that defines the metadata and configuration for a data source.
    """
    id: str  # User provided identifier for the source instance
    name: str  # The name of the source (must match registered data source name)
    config: Dict[str, Any] = field(default_factory=dict)  # Configuration for the source

@dataclass
class DestinationDefinition:
    """
    Describes the attributes and methods for a data destination.
    """
    id: str  # User provided identifier for the destination instance
    method: str  # The ingestion method (refer to IIngester)
    target: Optional[str] = None  # The ingestion target (refer to IIngester)

@dataclass
class DataFlowMappingDefinition:
    """
    Defines how data flows from a source to a destination and the optional transformation in between.
    """
    from_source: SourceDefinition
    to_destination: DestinationDefinition
    map_function: Optional[Callable[[DataSourcePayload], Optional[DataSourcePayload]]] = None
    # map_function accepts a payload and returns a payload or None (None means skip ingestion)

@dataclass
class Definitions:
    """
    Aggregates all the source, destination, and flow definitions for an ingestion pipeline.
    """
    sources: Dict[str, SourceDefinition]
    destinations: Dict[str, DestinationDefinition]
    flows: List[DataFlowMappingDefinition]

TOML Configuration Parser

Utilities for loading data flow configurations from TOML files for declarative pipeline definitions.

def load_config(file_path: str) -> Definitions:
    """
    Load configuration from TOML file.
    
    Args:
        file_path: Path to TOML configuration file
        
    Returns:
        Definitions object containing parsed sources, destinations, and flows
    """

def definitions_from_dict(data: Dict) -> Definitions:
    """
    Create definitions from dictionary data.
    
    Args:
        data: Dictionary containing sources, destinations, and flows configuration
        
    Returns:
        Definitions object
    """

Data Ingestion Engine

Low-level ingestion system that handles multi-threaded data processing from sources to destinations.

class DataSourceIngester:
    """
    Multi-threaded ingester for processing data from sources to destinations.
    """
    def __init__(self, job_input: IJobInput): ...
    
    def start_ingestion(
        self,
        data_source_id: str,
        data_source: IDataSource,
        destinations: List[IngestDestination] = None,
        error_callback: Optional[IDataSourceErrorCallback] = None,
    ):
        """
        Start ingestion from a data source to specified destinations.
        
        Args:
            data_source_id: Unique identifier for the data source
            data_source: The data source instance
            destinations: List of ingestion destinations
            error_callback: Optional callback for handling errors
        """
    
    def ingest_data_source(
        self,
        data_source_id: str,
        data_source: IDataSource,
        destination_table: Optional[str] = None,
        method: Optional[str] = None,
        target: Optional[str] = None,
        collection_id: Optional[str] = None,
        error_callback: Optional[IDataSourceErrorCallback] = None,
    ):
        """
        Ingest data source with single destination configuration.
        
        Args:
            data_source_id: Unique identifier for the data source
            data_source: The data source instance
            destination_table: Optional destination table name
            method: Ingestion method
            target: Ingestion target
            collection_id: Optional collection identifier
            error_callback: Optional callback for handling errors
        """
    
    def terminate_and_wait_to_finish(self):
        """
        Terminate ingestion and wait for all threads to finish.
        """
    
    def raise_on_error(self):
        """
        Raise aggregated exceptions if any errors occurred during ingestion.
        
        Raises:
            DataSourcesAggregatedException: If any ingestion errors occurred
        """

@dataclass
class IngestDestination:
    """Configuration for data ingestion destination."""
    destination_table: Optional[str] = None
    method: Optional[str] = None
    target: Optional[str] = None
    collection_id: Optional[str] = None
    map_function: Optional[Callable[[DataSourcePayload], Optional[DataSourcePayload]]] = None

Usage Examples

Simple Data Flow

from vdk.api.job_input import IJobInput
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping.definitions import (
    SourceDefinition, 
    DestinationDefinition, 
    DataFlowMappingDefinition
)

def run(job_input: IJobInput):
    # Define source
    source = SourceDefinition(
        id="my-source",
        name="auto-generated-data",
        config={"num_records": 100, "num_streams": 2}
    )
    
    # Define destination
    destination = DestinationDefinition(
        id="my-dest",
        method="memory"
    )
    
    # Create flow mapping
    flow_mapping = DataFlowMappingDefinition(source, destination)
    
    # Execute flow
    with DataFlowInput(job_input) as flow_input:
        flow_input.start(flow_mapping)

Data Flow with Transformation

def transform_payload(payload: DataSourcePayload) -> Optional[DataSourcePayload]:
    """Transform payload by adding a computed field."""
    if payload.data:
        # Add computed field
        payload.data["computed_field"] = payload.data.get("id", 0) * 2
        
        # Filter out records with id > 50
        if payload.data.get("id", 0) > 50:
            return None  # Skip this payload
    
    return payload

def run(job_input: IJobInput):
    source = SourceDefinition(id="src", name="auto-generated-data", config={})
    destination = DestinationDefinition(id="dest", method="memory")
    
    # Apply transformation
    flow_mapping = DataFlowMappingDefinition(
        source, 
        destination,
        map_function=transform_payload
    )
    
    with DataFlowInput(job_input) as flow_input:
        flow_input.start(flow_mapping)

TOML Configuration-Based Flow

Create a config.toml file:

[sources.database]
name = "postgresql"
config = { host = "localhost", port = 5432, database = "mydb" }

[sources.api]
name = "rest-api"
config = { base_url = "https://api.example.com", api_key = "secret" }

[destinations.warehouse]
method = "database"
target = "warehouse_db"

[destinations.lake]
method = "file"
target = "/data/lake"

[[flows]]
from = "database"
to = "warehouse"

[[flows]]
from = "api"
to = "lake"

Load and execute the configuration:

from vdk.plugin.data_sources.mapping import toml_parser

def run(job_input: IJobInput):
    # Load all definitions from TOML
    definitions = toml_parser.load_config("config.toml")
    
    # Execute all configured flows
    with DataFlowInput(job_input) as flow_input:
        flow_input.start_flows(definitions)

Multiple Flows

def run(job_input: IJobInput):
    # Define multiple sources
    source1 = SourceDefinition(id="src1", name="csv-files", config={"path": "/data/csv"})
    source2 = SourceDefinition(id="src2", name="api-data", config={"endpoint": "/users"})
    
    # Define destinations
    dest1 = DestinationDefinition(id="db", method="database", target="main_db")
    dest2 = DestinationDefinition(id="file", method="file", target="/output")
    
    # Create definitions
    definitions = Definitions(
        sources={"src1": source1, "src2": source2},
        destinations={"db": dest1, "file": dest2},
        flows=[
            DataFlowMappingDefinition(source1, dest1),
            DataFlowMappingDefinition(source2, dest2)
        ]
    )
    
    # Execute all flows
    with DataFlowInput(job_input) as flow_input:
        flow_input.start_flows(definitions)

Install with Tessl CLI

npx tessl i tessl/pypi-vdk-data-sources

docs

configuration.md

data-flow.md

data-sources.md

index.md

state-management.md

tile.json