Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.
Persistent state management for data sources enabling incremental ingestion and resume capabilities. The state management system allows data sources to track their progress and resume from where they left off.
Abstract interface for managing data source state with support for stream-specific and general state tracking.
class IDataSourceState:
"""
Abstract interface for data source state management.
"""
def read_stream(self, stream_name: str) -> Dict[str, Any]:
"""
Read the state of a stream within a data source.
Args:
stream_name: Name of the stream
Returns:
Dictionary containing the stream's state
"""
def update_stream(self, stream_name: str, state: Dict[str, Any]):
"""
Update the state of a stream within the data source.
Args:
stream_name: Name of the stream
state: New state dictionary for the stream
"""
def read_others(self, key: str) -> Dict[str, Any]:
"""
Read state of a data source that is not related to streams.
Args:
key: State key identifier
Returns:
Dictionary containing the state for the given key
"""
def update_others(self, key: str, state: Dict[str, Any]):
"""
Update state of a data source not related to streams.
Args:
key: State key identifier
state: New state dictionary for the key
"""Concrete implementation of state management that uses pluggable storage backends.
class DataSourceState(IDataSourceState):
"""
Concrete implementation of data source state management.
"""
def __init__(self, state_storage: IDataSourceStateStorage, source: str):
"""
Initialize state manager.
Args:
state_storage: Storage backend for persisting state
source: Data source identifier
"""
def read_stream(self, stream_name: str) -> Dict[str, Any]:
"""Read stream state from storage"""
def read_others(self, key: str) -> Dict[str, Any]:
"""Read general state from storage"""
def update_stream(self, stream_name: str, state: Dict[str, Any]):
"""Update stream state in storage"""
def update_others(self, key: str, state: Dict[str, Any]):
"""Update general state in storage"""Abstract interface for state storage backends that can be implemented for different persistence mechanisms.
class IDataSourceStateStorage:
"""
Abstract interface for state storage backends.
"""
def read(self, data_source: str) -> Dict[str, Any]:
"""
Read the state from underlying storage.
Args:
data_source: The data source name
Returns:
Dictionary containing the complete state for the data source
"""
def write(self, data_source: str, state: Dict[str, Any]):
"""
Write (persist) the current state of the data source to underlying storage.
Args:
data_source: The data source name
state: Complete state dictionary to persist
"""Built-in implementations for different storage backends.
class InMemoryDataSourceStateStorage(IDataSourceStateStorage):
"""
In-memory state storage useful for testing purposes.
"""
def __init__(self): ...
def read(self, data_source: str) -> Dict[str, Any]:
"""Read state from memory"""
def write(self, data_source: str, state: Dict[str, Any]):
"""Write state to memory"""
class PropertiesBasedDataSourceStorage(IDataSourceStateStorage):
"""
State storage implementation using VDK properties system.
"""
KEY = ".vdk.data_sources.state"
def __init__(self, properties: IProperties):
"""
Initialize properties-based storage.
Args:
properties: VDK properties interface
"""
def read(self, data_source: str) -> Dict[str, Any]:
"""Read state from VDK properties"""
def write(self, data_source: str, state: Dict[str, Any]):
"""Write state to VDK properties"""Factory class for creating state managers with specific storage backends.
class DataSourceStateFactory:
"""
Factory for creating data source state managers.
"""
def __init__(self, storage: IDataSourceStateStorage):
"""
Initialize factory with storage backend.
Args:
storage: State storage implementation
"""
def get_data_source_state(self, source: str) -> IDataSourceState:
"""
Create a state manager for a specific data source.
Args:
source: Data source identifier
Returns:
Data source state manager instance
"""from vdk.plugin.data_sources.data_source import IDataSource, IDataSourceStream
from vdk.plugin.data_sources.state import IDataSourceState
class IncrementalDataSourceStream(IDataSourceStream):
def __init__(self, stream_name: str, state: IDataSourceState):
self._stream_name = stream_name
self._state = state
def name(self) -> str:
return self._stream_name
def read(self) -> Iterable[DataSourcePayload]:
# Read last processed state
last_state = self._state.read_stream(self._stream_name)
last_id = last_state.get("last_id", 0)
# Simulate reading records starting from last processed ID
for record_id in range(last_id + 1, last_id + 11): # Process 10 records
data = {"id": record_id, "value": f"data_{record_id}"}
# Yield payload with state information
yield DataSourcePayload(
data=data,
metadata={"timestamp": datetime.now()},
state={"last_id": record_id} # State will be automatically persisted
)
class IncrementalDataSource(IDataSource):
def configure(self, config):
self._config = config
def connect(self, state: IDataSourceState):
self._state = state
# Create streams that can access state
self._streams = [
IncrementalDataSourceStream("stream_1", state),
IncrementalDataSourceStream("stream_2", state)
]
def disconnect(self):
self._streams = []
def streams(self):
return self._streamsimport json
import os
from vdk.plugin.data_sources.state import IDataSourceStateStorage
class FileBasedStateStorage(IDataSourceStateStorage):
"""Custom file-based state storage implementation."""
def __init__(self, state_directory: str):
self.state_directory = state_directory
os.makedirs(state_directory, exist_ok=True)
def _get_state_file_path(self, data_source: str) -> str:
return os.path.join(self.state_directory, f"{data_source}_state.json")
def read(self, data_source: str) -> Dict[str, Any]:
state_file = self._get_state_file_path(data_source)
if os.path.exists(state_file):
with open(state_file, 'r') as f:
return json.load(f)
return {}
def write(self, data_source: str, state: Dict[str, Any]):
state_file = self._get_state_file_path(data_source)
with open(state_file, 'w') as f:
json.dump(state, f, indent=2, default=str)import sqlite3
import json
from typing import Any, Dict
class DatabaseStateStorage(IDataSourceStateStorage):
"""Database-backed state storage implementation."""
def __init__(self, db_path: str):
self.db_path = db_path
self._init_database()
def _init_database(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS data_source_state (
source_name TEXT PRIMARY KEY,
state_data TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
def read(self, data_source: str) -> Dict[str, Any]:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT state_data FROM data_source_state WHERE source_name = ?",
(data_source,)
)
row = cursor.fetchone()
if row:
return json.loads(row[0])
return {}
def write(self, data_source: str, state: Dict[str, Any]):
state_json = json.dumps(state, default=str)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO data_source_state (source_name, state_data)
VALUES (?, ?)
""", (data_source, state_json))class ComplexDataSource(IDataSource):
def connect(self, state: IDataSourceState):
self._state = state
# Read different types of state
connection_state = state.read_others("connection")
if connection_state:
print(f"Resuming connection from: {connection_state}")
# Read stream-specific state
for stream_name in ["orders", "customers", "products"]:
stream_state = state.read_stream(stream_name)
last_sync = stream_state.get("last_sync_time")
if last_sync:
print(f"Stream {stream_name} last synced at: {last_sync}")
# Update general state
state.update_others("connection", {
"connected_at": datetime.now().isoformat(),
"server_version": "1.2.3"
})from vdk.plugin.data_sources.state import DataSourceStateFactory, InMemoryDataSourceStateStorage
# Create factory with in-memory storage for testing
factory = DataSourceStateFactory(InMemoryDataSourceStateStorage())
# Get state manager for specific data source
source_state = factory.get_data_source_state("my-database")
# Use state manager
source_state.update_stream("table1", {"last_row_id": 1000})
source_state.update_others("metadata", {"schema_version": "2.1"})
# Read state back
table_state = source_state.read_stream("table1")
metadata = source_state.read_others("metadata")The state management system is automatically integrated with the data ingestion pipeline. When a DataSourcePayload includes state information, it's automatically persisted after successful ingestion:
# In your data source stream implementation
def read(self) -> Iterable[DataSourcePayload]:
# Read current state
current_state = self._state.read_stream(self.name())
last_processed = current_state.get("last_processed_timestamp", 0)
# Query new data since last processed timestamp
new_records = self._fetch_records_since(last_processed)
for record in new_records:
# Yield payload with updated state
yield DataSourcePayload(
data=record,
metadata={"record_timestamp": record["timestamp"]},
state={"last_processed_timestamp": record["timestamp"]}
# This state will be automatically persisted after successful ingestion
)Install with Tessl CLI
npx tessl i tessl/pypi-vdk-data-sources