Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.
npx @tessl/cli install tessl/pypi-vdk-data-sources@0.1.0Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management. This plugin simplifies data pipeline construction by offering consistent APIs for connecting to databases, REST APIs, message brokers, and other data sources.
pip install vdk-data-sourcesfrom vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping.definitions import (
SourceDefinition,
DestinationDefinition,
DataFlowMappingDefinition
)For implementing custom data sources:
from vdk.plugin.data_sources.data_source import (
IDataSource,
IDataSourceStream,
IDataSourceConfiguration,
DataSourcePayload
)
from vdk.plugin.data_sources.factory import data_source, SingletonDataSourceFactory
from vdk.plugin.data_sources.config import config_class, config_field
from vdk.plugin.data_sources.state import IDataSourceStateFor TOML configuration parsing:
from vdk.plugin.data_sources.mapping import toml_parserFor CLI utilities:
from vdk.plugin.data_sources.sources_command import list_data_sources, list_config_optionsfrom vdk.api.job_input import IJobInput
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping.definitions import (
SourceDefinition,
DestinationDefinition,
DataFlowMappingDefinition
)
def run(job_input: IJobInput):
# Define source and destination
source = SourceDefinition(
id="my-source",
name="auto-generated-data",
config={"num_records": 100}
)
destination = DestinationDefinition(
id="my-dest",
method="memory"
)
# Execute data flow
with DataFlowInput(job_input) as flow_input:
flow_input.start(DataFlowMappingDefinition(source, destination))from vdk.api.job_input import IJobInput
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping import toml_parser
def run(job_input: IJobInput):
# Load configuration from TOML file
definitions = toml_parser.load_config("config.toml")
# Execute all configured flows
with DataFlowInput(job_input) as flow_input:
flow_input.start_flows(definitions)Example config.toml:
[sources.auto]
name = "auto-generated-data"
config = { num_records = 50 }
[destinations.auto-dest]
method = "memory"
[[flows]]
from = "auto"
to = "auto-dest"The vdk-data-sources plugin follows a layered architecture with clear separation of concerns:
Core interfaces and utilities for creating custom data sources that integrate with the VDK data ingestion framework.
@data_source(name: str, config_class: Type[IDataSourceConfiguration])
def data_source_decorator(data_source_class: Type[IDataSource]): ...
class IDataSource:
def configure(self, config: IDataSourceConfiguration): ...
def connect(self, state: Optional[IDataSourceState]): ...
def disconnect(self): ...
def streams(self) -> List[IDataSourceStream]: ...
class IDataSourceStream:
def name(self) -> str: ...
def read(self) -> Iterable[DataSourcePayload]: ...High-level orchestration system for managing data flows from sources to destinations with transformation support.
class DataFlowInput:
def __init__(self, job_input: IJobInput): ...
def start(self, flow_definition: DataFlowMappingDefinition): ...
def start_flows(self, definitions: Definitions): ...
def close(self): ...
class DataFlowMappingDefinition:
from_source: SourceDefinition
to_destination: DestinationDefinition
map_function: Optional[Callable[[DataSourcePayload], Optional[DataSourcePayload]]]Typed configuration management system with decorators for defining data source configuration schemas.
@config_class(name: str, description: str, **kwargs)
def config_class_decorator(cls): ...
def config_field(
description: str,
is_sensitive: bool = False,
default=MISSING,
**kwargs
): ...Persistent state management for data sources enabling incremental ingestion and resume capabilities.
class IDataSourceState:
def read_stream(self, stream_name: str) -> Dict[str, Any]: ...
def update_stream(self, stream_name: str, state: Dict[str, Any]): ...
def read_others(self, key: str) -> Dict[str, Any]: ...
def update_others(self, key: str, state: Dict[str, Any]): ...Interfaces for implementing metrics collection and analysis on data being ingested from data sources.
class IDataMetricsBatchAnalyzer:
def analyze_batch(self, payload: DataSourcePayload): ...
class IDataMetricsFullAnalyzer:
def get_data_store_target(self) -> str: ...
def get_data_store_method(self) -> str: ...
def analyze_at_the_end(self): ...Functions for programmatically accessing data source registry information and exploring available data sources.
def list_data_sources() -> List[Dict]:
"""
List all registered data sources.
Returns:
List of dictionaries containing data source information
"""
def list_config_options(data_source_name: str) -> List[Dict]:
"""
List configuration options for a specific data source.
Args:
data_source_name: Name of the data source
Returns:
List of dictionaries containing configuration options
"""@dataclass(frozen=True)
class DataSourcePayload:
data: Optional[Dict[str, Any]]
metadata: Optional[Dict[str, Union[int, str, bool, float, datetime]]]
state: Optional[Dict[str, Any]] = field(default_factory=dict)
destination_table: Optional[str] = None
@dataclass
class SourceDefinition:
id: str
name: str
config: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DestinationDefinition:
id: str
method: str
target: Optional[str] = None
@dataclass
class Definitions:
sources: Dict[str, SourceDefinition]
destinations: Dict[str, DestinationDefinition]
flows: List[DataFlowMappingDefinition]
class StopDataSourceStream(Exception):
"""Signal the end of a stream and there's no more data"""
class RetryDataSourceStream(Exception):
"""Signal the stream ingestion should be retried"""
class DataSourcesAggregatedException(Exception):
"""Exception to aggregate multiple Data Sources failures"""
def __init__(self, data_streams_exceptions: Dict[str, Exception]): ...
class DataSourceNotFoundException(Exception):
"""Raised when a requested data source is not found in registry"""
@dataclass
class DataSourceError:
"""Data class to encapsulate information about a Data Source ingestion error"""
data_stream: IDataSourceStream
exception: BaseException