Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.
High-level orchestration system for managing data flows from sources to destinations with transformation support. The data flow system provides a declarative way to define and execute data ingestion pipelines.
Context manager for orchestrating data flows from sources to destinations with automatic resource management.
class DataFlowInput:
"""
Manages the data flow from source to destination using the provided job input.
"""
def __init__(self, job_input: IJobInput):
"""
Initialize data flow manager.
Args:
job_input: Instance of IJobInput, providing context for the current job
"""
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with automatic cleanup"""
def close(self):
"""
Terminates and waits for data source ingestion to finish.
"""
def start(self, flow_definition: DataFlowMappingDefinition):
"""
Start data flow from a specific source to a specific destination.
Args:
flow_definition: The definition of the source and destination flow
"""
def start_flows(self, definitions: Definitions):
"""
Start data flows based on a list of defined mappings.
Args:
definitions: Definitions containing the data flow mappings
"""Data structures for defining sources, destinations, and mappings between them.
@dataclass
class SourceDefinition:
"""
SourceDefinition class that defines the metadata and configuration for a data source.
"""
id: str # User provided identifier for the source instance
name: str # The name of the source (must match registered data source name)
config: Dict[str, Any] = field(default_factory=dict) # Configuration for the source
@dataclass
class DestinationDefinition:
"""
Describes the attributes and methods for a data destination.
"""
id: str # User provided identifier for the destination instance
method: str # The ingestion method (refer to IIngester)
target: Optional[str] = None # The ingestion target (refer to IIngester)
@dataclass
class DataFlowMappingDefinition:
"""
Defines how data flows from a source to a destination and the optional transformation in between.
"""
from_source: SourceDefinition
to_destination: DestinationDefinition
map_function: Optional[Callable[[DataSourcePayload], Optional[DataSourcePayload]]] = None
# map_function accepts a payload and returns a payload or None (None means skip ingestion)
@dataclass
class Definitions:
"""
Aggregates all the source, destination, and flow definitions for an ingestion pipeline.
"""
sources: Dict[str, SourceDefinition]
destinations: Dict[str, DestinationDefinition]
flows: List[DataFlowMappingDefinition]Utilities for loading data flow configurations from TOML files for declarative pipeline definitions.
def load_config(file_path: str) -> Definitions:
"""
Load configuration from TOML file.
Args:
file_path: Path to TOML configuration file
Returns:
Definitions object containing parsed sources, destinations, and flows
"""
def definitions_from_dict(data: Dict) -> Definitions:
"""
Create definitions from dictionary data.
Args:
data: Dictionary containing sources, destinations, and flows configuration
Returns:
Definitions object
"""Low-level ingestion system that handles multi-threaded data processing from sources to destinations.
class DataSourceIngester:
"""
Multi-threaded ingester for processing data from sources to destinations.
"""
def __init__(self, job_input: IJobInput): ...
def start_ingestion(
self,
data_source_id: str,
data_source: IDataSource,
destinations: List[IngestDestination] = None,
error_callback: Optional[IDataSourceErrorCallback] = None,
):
"""
Start ingestion from a data source to specified destinations.
Args:
data_source_id: Unique identifier for the data source
data_source: The data source instance
destinations: List of ingestion destinations
error_callback: Optional callback for handling errors
"""
def ingest_data_source(
self,
data_source_id: str,
data_source: IDataSource,
destination_table: Optional[str] = None,
method: Optional[str] = None,
target: Optional[str] = None,
collection_id: Optional[str] = None,
error_callback: Optional[IDataSourceErrorCallback] = None,
):
"""
Ingest data source with single destination configuration.
Args:
data_source_id: Unique identifier for the data source
data_source: The data source instance
destination_table: Optional destination table name
method: Ingestion method
target: Ingestion target
collection_id: Optional collection identifier
error_callback: Optional callback for handling errors
"""
def terminate_and_wait_to_finish(self):
"""
Terminate ingestion and wait for all threads to finish.
"""
def raise_on_error(self):
"""
Raise aggregated exceptions if any errors occurred during ingestion.
Raises:
DataSourcesAggregatedException: If any ingestion errors occurred
"""
@dataclass
class IngestDestination:
"""Configuration for data ingestion destination."""
destination_table: Optional[str] = None
method: Optional[str] = None
target: Optional[str] = None
collection_id: Optional[str] = None
map_function: Optional[Callable[[DataSourcePayload], Optional[DataSourcePayload]]] = Nonefrom vdk.api.job_input import IJobInput
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping.definitions import (
SourceDefinition,
DestinationDefinition,
DataFlowMappingDefinition
)
def run(job_input: IJobInput):
# Define source
source = SourceDefinition(
id="my-source",
name="auto-generated-data",
config={"num_records": 100, "num_streams": 2}
)
# Define destination
destination = DestinationDefinition(
id="my-dest",
method="memory"
)
# Create flow mapping
flow_mapping = DataFlowMappingDefinition(source, destination)
# Execute flow
with DataFlowInput(job_input) as flow_input:
flow_input.start(flow_mapping)def transform_payload(payload: DataSourcePayload) -> Optional[DataSourcePayload]:
"""Transform payload by adding a computed field."""
if payload.data:
# Add computed field
payload.data["computed_field"] = payload.data.get("id", 0) * 2
# Filter out records with id > 50
if payload.data.get("id", 0) > 50:
return None # Skip this payload
return payload
def run(job_input: IJobInput):
source = SourceDefinition(id="src", name="auto-generated-data", config={})
destination = DestinationDefinition(id="dest", method="memory")
# Apply transformation
flow_mapping = DataFlowMappingDefinition(
source,
destination,
map_function=transform_payload
)
with DataFlowInput(job_input) as flow_input:
flow_input.start(flow_mapping)Create a config.toml file:
[sources.database]
name = "postgresql"
config = { host = "localhost", port = 5432, database = "mydb" }
[sources.api]
name = "rest-api"
config = { base_url = "https://api.example.com", api_key = "secret" }
[destinations.warehouse]
method = "database"
target = "warehouse_db"
[destinations.lake]
method = "file"
target = "/data/lake"
[[flows]]
from = "database"
to = "warehouse"
[[flows]]
from = "api"
to = "lake"Load and execute the configuration:
from vdk.plugin.data_sources.mapping import toml_parser
def run(job_input: IJobInput):
# Load all definitions from TOML
definitions = toml_parser.load_config("config.toml")
# Execute all configured flows
with DataFlowInput(job_input) as flow_input:
flow_input.start_flows(definitions)def run(job_input: IJobInput):
# Define multiple sources
source1 = SourceDefinition(id="src1", name="csv-files", config={"path": "/data/csv"})
source2 = SourceDefinition(id="src2", name="api-data", config={"endpoint": "/users"})
# Define destinations
dest1 = DestinationDefinition(id="db", method="database", target="main_db")
dest2 = DestinationDefinition(id="file", method="file", target="/output")
# Create definitions
definitions = Definitions(
sources={"src1": source1, "src2": source2},
destinations={"db": dest1, "file": dest2},
flows=[
DataFlowMappingDefinition(source1, dest1),
DataFlowMappingDefinition(source2, dest2)
]
)
# Execute all flows
with DataFlowInput(job_input) as flow_input:
flow_input.start_flows(definitions)Install with Tessl CLI
npx tessl i tessl/pypi-vdk-data-sources