CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-vdk-data-sources

Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.

Overview
Eval results
Files

state-management.mddocs/

State Management

Persistent state management for data sources enabling incremental ingestion and resume capabilities. The state management system allows data sources to track their progress and resume from where they left off.

Capabilities

Data Source State Interface

Abstract interface for managing data source state with support for stream-specific and general state tracking.

class IDataSourceState:
    """
    Abstract interface for data source state management.
    """
    def read_stream(self, stream_name: str) -> Dict[str, Any]:
        """
        Read the state of a stream within a data source.
        
        Args:
            stream_name: Name of the stream
            
        Returns:
            Dictionary containing the stream's state
        """
    
    def update_stream(self, stream_name: str, state: Dict[str, Any]):
        """
        Update the state of a stream within the data source.
        
        Args:
            stream_name: Name of the stream
            state: New state dictionary for the stream
        """
    
    def read_others(self, key: str) -> Dict[str, Any]:
        """
        Read state of a data source that is not related to streams.
        
        Args:
            key: State key identifier
            
        Returns:
            Dictionary containing the state for the given key
        """
    
    def update_others(self, key: str, state: Dict[str, Any]):
        """
        Update state of a data source not related to streams.
        
        Args:
            key: State key identifier
            state: New state dictionary for the key
        """

Data Source State Implementation

Concrete implementation of state management that uses pluggable storage backends.

class DataSourceState(IDataSourceState):
    """
    Concrete implementation of data source state management.
    """
    def __init__(self, state_storage: IDataSourceStateStorage, source: str):
        """
        Initialize state manager.
        
        Args:
            state_storage: Storage backend for persisting state
            source: Data source identifier
        """
    
    def read_stream(self, stream_name: str) -> Dict[str, Any]:
        """Read stream state from storage"""
    
    def read_others(self, key: str) -> Dict[str, Any]:
        """Read general state from storage"""
    
    def update_stream(self, stream_name: str, state: Dict[str, Any]):
        """Update stream state in storage"""
    
    def update_others(self, key: str, state: Dict[str, Any]):
        """Update general state in storage"""

State Storage Interface

Abstract interface for state storage backends that can be implemented for different persistence mechanisms.

class IDataSourceStateStorage:
    """
    Abstract interface for state storage backends.
    """
    def read(self, data_source: str) -> Dict[str, Any]:
        """
        Read the state from underlying storage.
        
        Args:
            data_source: The data source name
            
        Returns:
            Dictionary containing the complete state for the data source
        """
    
    def write(self, data_source: str, state: Dict[str, Any]):
        """
        Write (persist) the current state of the data source to underlying storage.
        
        Args:
            data_source: The data source name
            state: Complete state dictionary to persist
        """

State Storage Implementations

Built-in implementations for different storage backends.

class InMemoryDataSourceStateStorage(IDataSourceStateStorage):
    """
    In-memory state storage useful for testing purposes.
    """
    def __init__(self): ...
    
    def read(self, data_source: str) -> Dict[str, Any]:
        """Read state from memory"""
    
    def write(self, data_source: str, state: Dict[str, Any]):
        """Write state to memory"""

class PropertiesBasedDataSourceStorage(IDataSourceStateStorage):
    """
    State storage implementation using VDK properties system.
    """
    KEY = ".vdk.data_sources.state"
    
    def __init__(self, properties: IProperties):
        """
        Initialize properties-based storage.
        
        Args:
            properties: VDK properties interface
        """
    
    def read(self, data_source: str) -> Dict[str, Any]:
        """Read state from VDK properties"""
    
    def write(self, data_source: str, state: Dict[str, Any]):
        """Write state to VDK properties"""

State Factory

Factory class for creating state managers with specific storage backends.

class DataSourceStateFactory:
    """
    Factory for creating data source state managers.
    """
    def __init__(self, storage: IDataSourceStateStorage):
        """
        Initialize factory with storage backend.
        
        Args:
            storage: State storage implementation
        """
    
    def get_data_source_state(self, source: str) -> IDataSourceState:
        """
        Create a state manager for a specific data source.
        
        Args:
            source: Data source identifier
            
        Returns:
            Data source state manager instance
        """

Usage Examples

Basic State Management in Data Sources

from vdk.plugin.data_sources.data_source import IDataSource, IDataSourceStream
from vdk.plugin.data_sources.state import IDataSourceState

class IncrementalDataSourceStream(IDataSourceStream):
    def __init__(self, stream_name: str, state: IDataSourceState):
        self._stream_name = stream_name
        self._state = state
    
    def name(self) -> str:
        return self._stream_name
    
    def read(self) -> Iterable[DataSourcePayload]:
        # Read last processed state
        last_state = self._state.read_stream(self._stream_name)
        last_id = last_state.get("last_id", 0)
        
        # Simulate reading records starting from last processed ID
        for record_id in range(last_id + 1, last_id + 11):  # Process 10 records
            data = {"id": record_id, "value": f"data_{record_id}"}
            
            # Yield payload with state information
            yield DataSourcePayload(
                data=data,
                metadata={"timestamp": datetime.now()},
                state={"last_id": record_id}  # State will be automatically persisted
            )

class IncrementalDataSource(IDataSource):
    def configure(self, config):
        self._config = config
    
    def connect(self, state: IDataSourceState):
        self._state = state
        # Create streams that can access state
        self._streams = [
            IncrementalDataSourceStream("stream_1", state),
            IncrementalDataSourceStream("stream_2", state)
        ]
    
    def disconnect(self):
        self._streams = []
    
    def streams(self):
        return self._streams

Custom State Storage Backend

import json
import os
from vdk.plugin.data_sources.state import IDataSourceStateStorage

class FileBasedStateStorage(IDataSourceStateStorage):
    """Custom file-based state storage implementation."""
    
    def __init__(self, state_directory: str):
        self.state_directory = state_directory
        os.makedirs(state_directory, exist_ok=True)
    
    def _get_state_file_path(self, data_source: str) -> str:
        return os.path.join(self.state_directory, f"{data_source}_state.json")
    
    def read(self, data_source: str) -> Dict[str, Any]:
        state_file = self._get_state_file_path(data_source)
        if os.path.exists(state_file):
            with open(state_file, 'r') as f:
                return json.load(f)
        return {}
    
    def write(self, data_source: str, state: Dict[str, Any]):
        state_file = self._get_state_file_path(data_source)
        with open(state_file, 'w') as f:
            json.dump(state, f, indent=2, default=str)

Database-Backed State Storage

import sqlite3
import json
from typing import Any, Dict

class DatabaseStateStorage(IDataSourceStateStorage):
    """Database-backed state storage implementation."""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_database()
    
    def _init_database(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS data_source_state (
                    source_name TEXT PRIMARY KEY,
                    state_data TEXT,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
    
    def read(self, data_source: str) -> Dict[str, Any]:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT state_data FROM data_source_state WHERE source_name = ?",
                (data_source,)
            )
            row = cursor.fetchone()
            if row:
                return json.loads(row[0])
            return {}
    
    def write(self, data_source: str, state: Dict[str, Any]):
        state_json = json.dumps(state, default=str)
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT OR REPLACE INTO data_source_state (source_name, state_data)
                VALUES (?, ?)
            """, (data_source, state_json))

State Management with Different State Types

class ComplexDataSource(IDataSource):
    def connect(self, state: IDataSourceState):
        self._state = state
        
        # Read different types of state
        connection_state = state.read_others("connection")
        if connection_state:
            print(f"Resuming connection from: {connection_state}")
        
        # Read stream-specific state
        for stream_name in ["orders", "customers", "products"]:
            stream_state = state.read_stream(stream_name)
            last_sync = stream_state.get("last_sync_time")
            if last_sync:
                print(f"Stream {stream_name} last synced at: {last_sync}")
        
        # Update general state
        state.update_others("connection", {
            "connected_at": datetime.now().isoformat(),
            "server_version": "1.2.3"
        })

State Factory Usage

from vdk.plugin.data_sources.state import DataSourceStateFactory, InMemoryDataSourceStateStorage

# Create factory with in-memory storage for testing
factory = DataSourceStateFactory(InMemoryDataSourceStateStorage())

# Get state manager for specific data source
source_state = factory.get_data_source_state("my-database")

# Use state manager
source_state.update_stream("table1", {"last_row_id": 1000})
source_state.update_others("metadata", {"schema_version": "2.1"})

# Read state back
table_state = source_state.read_stream("table1")
metadata = source_state.read_others("metadata")

Integration with Data Ingestion System

The state management system is automatically integrated with the data ingestion pipeline. When a DataSourcePayload includes state information, it's automatically persisted after successful ingestion:

# In your data source stream implementation
def read(self) -> Iterable[DataSourcePayload]:
    # Read current state
    current_state = self._state.read_stream(self.name())
    last_processed = current_state.get("last_processed_timestamp", 0)
    
    # Query new data since last processed timestamp
    new_records = self._fetch_records_since(last_processed)
    
    for record in new_records:
        # Yield payload with updated state
        yield DataSourcePayload(
            data=record,
            metadata={"record_timestamp": record["timestamp"]},
            state={"last_processed_timestamp": record["timestamp"]}
            # This state will be automatically persisted after successful ingestion
        )

Install with Tessl CLI

npx tessl i tessl/pypi-vdk-data-sources

docs

configuration.md

data-flow.md

data-sources.md

index.md

state-management.md

tile.json