Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.
Typed configuration management system with decorators for defining data source configuration schemas. The configuration system ensures type safety and provides metadata for documentation and validation.
Decorator for marking classes as configuration classes with metadata and validation.
def config_class(name: str, description: str, **kwargs):
"""
A decorator to mark a class as a configuration class.
Args:
name: The name of the configuration group representing by the class
description: The description of the configuration group
**kwargs: Additional arguments passed to dataclass decorator
Returns:
Decorated configuration class
"""Function for defining configuration fields with metadata and validation.
def config_field(
*,
description: str,
is_sensitive: bool = False,
default=MISSING,
init=True,
repr=True,
hash=None,
compare=True,
metadata=None,
**kwargs,
):
"""
Define a field within a configuration class.
Args:
description: Description of the configuration field
is_sensitive: Indicates if the configuration field contains sensitive information
default: Default value for the field
init: Include field in __init__ method
repr: Include field in __repr__ method
hash: Include field in __hash__ method
compare: Include field in comparison methods
metadata: Additional metadata dictionary
**kwargs: Additional arguments passed to dataclass field
Returns:
Field definition for use in configuration classes
"""Base interface that all configuration classes must implement.
class IDataSourceConfiguration:
"""
Interface representing the configuration for a data source.
Configuration classes must be decorated with @config_class and
implement this interface.
"""
def __new__(cls, *args, **kwargs):
"""Validates that class is decorated with @config_class"""Classes for accessing configuration metadata at runtime.
class ConfigFieldMetadata:
"""
Metadata class for configuration fields.
"""
def __init__(self, field_class, field_name: str): ...
def name(self) -> str:
"""Returns the field name"""
def description(self) -> str:
"""Returns the field description"""
def is_sensitive(self) -> bool:
"""Returns whether the field contains sensitive information"""
def default(self) -> Optional[Any]:
"""Returns the default value for the field"""
class ConfigClassMetadata:
"""
Metadata class for configuration classes.
"""
def __init__(self, cls: Type): ...
def get_group_name(self) -> str:
"""Returns the configuration group name"""
def get_description(self) -> str:
"""Returns the configuration group description"""
def get_config_fields(self) -> List[ConfigFieldMetadata]:
"""Returns metadata for all configuration fields"""Utility functions for working with configuration classes.
def is_config_class(cls: Type) -> bool:
"""
Check if a class is decorated with @config_class.
Args:
cls: Class to check
Returns:
True if class is a configuration class
"""
def create_config_from_dict(config_class: Type[T], config_data: Dict[str, Any]) -> T:
"""
Create a configuration instance from a dictionary.
Args:
config_class: Configuration class type
config_data: Dictionary of configuration values
Returns:
Configuration instance
"""from vdk.plugin.data_sources.data_source import IDataSourceConfiguration
from vdk.plugin.data_sources.config import config_class, config_field
from typing import List, Optional
@config_class(name="database", description="Database connection configuration")
class DatabaseConfiguration(IDataSourceConfiguration):
host: str = config_field(description="Database host address")
port: int = config_field(description="Database port", default=5432)
database: str = config_field(description="Database name")
username: str = config_field(description="Database username")
password: str = config_field(description="Database password", is_sensitive=True)
ssl_enabled: bool = config_field(description="Enable SSL connection", default=False)
connection_timeout: int = config_field(description="Connection timeout in seconds", default=30)from dataclasses import field
from typing import Dict, List
@config_class(name="api", description="REST API configuration")
class ApiConfiguration(IDataSourceConfiguration):
base_url: str = config_field(description="Base URL for the API")
api_key: str = config_field(description="API authentication key", is_sensitive=True)
headers: Dict[str, str] = config_field(
description="Additional HTTP headers",
default_factory=dict
)
endpoints: List[str] = config_field(
description="List of API endpoints to fetch",
default_factory=lambda: ["/users", "/orders"]
)
rate_limit: int = config_field(description="Requests per second limit", default=10)
retry_attempts: int = config_field(description="Number of retry attempts", default=3)
timeout: float = config_field(description="Request timeout in seconds", default=30.0)@config_class(name="auth", description="Authentication configuration")
class AuthConfiguration(IDataSourceConfiguration):
method: str = config_field(description="Authentication method", default="basic")
username: str = config_field(description="Username")
password: str = config_field(description="Password", is_sensitive=True)
@config_class(name="complex-api", description="Complex API with authentication")
class ComplexApiConfiguration(IDataSourceConfiguration):
base_url: str = config_field(description="Base URL for the API")
auth: AuthConfiguration = config_field(description="Authentication configuration")
batch_size: int = config_field(description="Batch size for requests", default=100)from vdk.plugin.data_sources.config import ConfigClassMetadata
# Access configuration metadata
config_meta = ConfigClassMetadata(DatabaseConfiguration)
print(f"Configuration group: {config_meta.get_group_name()}")
print(f"Description: {config_meta.get_description()}")
# Iterate through fields
for field_meta in config_meta.get_config_fields():
print(f"Field: {field_meta.name()}")
print(f" Description: {field_meta.description()}")
print(f" Sensitive: {field_meta.is_sensitive()}")
print(f" Default: {field_meta.default()}")from vdk.plugin.data_sources.config import create_config_from_dict
# Configuration data
config_data = {
"host": "localhost",
"port": 5432,
"database": "mydb",
"username": "user",
"password": "secret",
"ssl_enabled": True
}
# Create configuration instance
config = create_config_from_dict(DatabaseConfiguration, config_data)
print(f"Database: {config.database} at {config.host}:{config.port}")from vdk.plugin.data_sources.data_source import IDataSource
from vdk.plugin.data_sources.factory import data_source
@data_source(name="my-database", config_class=DatabaseConfiguration)
class DatabaseDataSource(IDataSource):
def configure(self, config: DatabaseConfiguration):
self._config = config
# Configuration is now strongly typed
print(f"Connecting to {config.host}:{config.port}")
# Access sensitive fields (handled appropriately)
if config.ssl_enabled:
print("Using SSL connection")
def connect(self, state):
# Use configuration to establish connection
connection_string = f"postgresql://{self._config.username}@{self._config.host}:{self._config.port}/{self._config.database}"
# ... implement connection logic
def disconnect(self):
# ... implement disconnect logic
pass
def streams(self):
# ... return available streams
return []@config_class(name="validated", description="Configuration with validation")
class ValidatedConfiguration(IDataSourceConfiguration):
url: str = config_field(description="URL to connect to")
timeout: int = config_field(description="Timeout in seconds", default=30)
def __post_init__(self):
# Custom validation logic
if not self.url.startswith(('http://', 'https://')):
raise ValueError("URL must start with http:// or https://")
if self.timeout <= 0:
raise ValueError("Timeout must be positive")
if self.timeout > 300:
raise ValueError("Timeout cannot exceed 300 seconds")Install with Tessl CLI
npx tessl i tessl/pypi-vdk-data-sources