tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Base class for implementing custom dataset types.
from kedro.io import AbstractDatasetclass AbstractDataset:
"""Base class for all dataset implementations."""
_EPHEMERAL: bool = False # True for non-persistent datasets
_SINGLE_PROCESS: bool = False # True if cannot be used with ParallelRunner
@classmethod
def from_config(
cls,
name: str,
config: dict[str, Any],
load_version: str | None = None,
save_version: str | None = None
) -> AbstractDataset:
"""
Create a dataset instance from configuration.
Parameters:
- name: Dataset name
- config: Dataset configuration dictionary (must contain 'type' key)
- load_version: Version string for load operation (versioned datasets only)
- save_version: Version string for save operation (versioned datasets only)
Returns:
Instance of AbstractDataset subclass
Raises:
- DatasetError: When dataset creation fails
"""
def load(self) -> Any:
"""
Load data from the dataset.
Returns:
Loaded data
"""
def save(self, data: Any) -> None:
"""
Save data to the dataset.
Parameters:
- data: Data to save
"""
def _load(self) -> Any:
"""
Load data from the dataset.
Must be implemented by subclasses.
"""
def _save(self, data: Any) -> None:
"""
Save data to the dataset.
Must be implemented by subclasses.
"""
def _describe(self) -> dict[str, Any]:
"""
Describe the dataset.
**Must** be implemented by subclasses (abstract method).
Returns:
Dictionary containing dataset description
Raises:
- NotImplementedError: If not implemented by subclass
"""
def _exists(self) -> bool:
"""
Check if dataset exists.
Should be implemented by subclasses.
Returns:
True if dataset exists, False otherwise
"""
def exists(self) -> bool:
"""
Check if dataset's output already exists.
Returns:
True if dataset exists, False otherwise
Raises:
- DatasetError: When exists check fails
"""
def release(self) -> None:
"""
Release any cached data.
Raises:
- DatasetError: When release operation fails
"""
def _release(self) -> None:
"""
Release any cached data (internal implementation).
Can be overridden by subclasses to implement custom release logic.
"""
def _copy(self, **overwrite_params: Any) -> AbstractDataset:
"""
Create a copy of this dataset with optional parameter overrides.
Parameters:
- **overwrite_params: Parameters to override in the copy
Returns:
Deep copy of the dataset with overridden parameters
"""class AbstractDataset:
_EPHEMERAL: bool = False
"""
Mark dataset as ephemeral (non-persistent).
Ephemeral datasets like MemoryDataset don't persist data to disk.
Set to True for in-memory or temporary datasets.
Default: False
"""
_SINGLE_PROCESS: bool = False
"""
Mark dataset as single-process only.
Datasets that cannot be used with ParallelRunner should set this to True.
Examples include datasets using non-serializable resources or local connections.
Default: False
"""Datasets follow a standard lifecycle with hooks at each stage:
__init__() - Constructor receives configuration parameters_init_config()load() - Public method with logging and error handling_load() - Internal implementation (must be implemented by subclasses)_load_wrapper()save(data) - Public method with validation and error handling_save(data) - Internal implementation (must be implemented by subclasses)_save_wrapper()exists() - Public method with error handling_exists() - Internal implementation (should be implemented by subclasses)release() - Public method to free cached data_release() - Internal implementation (optional override)_describe() - Returns dictionary describing dataset configuration__repr__()from kedro.io import AbstractDataset
import json
class JSONDataset(AbstractDataset):
"""Custom dataset for JSON files."""
def __init__(self, filepath: str):
self._filepath = filepath
def _load(self) -> dict:
with open(self._filepath, 'r') as f:
return json.load(f)
def _save(self, data: dict) -> None:
with open(self._filepath, 'w') as f:
json.dump(data, f, indent=2)
def _describe(self) -> dict:
return {
"filepath": self._filepath,
"type": "JSONDataset"
}
def _exists(self) -> bool:
from pathlib import Path
return Path(self._filepath).exists()from kedro.io import AbstractDataset
# Create dataset from configuration
config = {
"type": "pandas.CSVDataset",
"filepath": "data/raw/input.csv",
"load_args": {"sep": ","}
}
dataset = AbstractDataset.from_config(
name="input_data",
config=config,
load_version="2024-01-15T10.30.45.123Z", # Optional
save_version="2024-01-15T11.00.00.000Z" # Optional
)
# Load data
data = dataset.load()from kedro_datasets.pandas import CSVDataset
# Original dataset
original = CSVDataset(filepath="data/input.csv")
# Create copy with modified parameters
copy_dataset = original._copy(_filepath="data/output.csv")
# Both datasets work independently
data = original.load()
copy_dataset.save(data)from kedro.io import AbstractDataset
class TempDataset(AbstractDataset):
"""Dataset that doesn't persist data."""
_EPHEMERAL = True # Mark as non-persistent
def __init__(self):
self._data = None
def _load(self):
return self._data
def _save(self, data):
self._data = data
def _describe(self):
return {"type": "TempDataset"}from kedro.io import AbstractDataset
class LocalConnectionDataset(AbstractDataset):
"""Dataset using non-serializable local resources."""
_SINGLE_PROCESS = True # Cannot be used with ParallelRunner
def __init__(self, connection):
self._connection = connection # Non-serializable
def _load(self):
return self._connection.fetch_data()
def _save(self, data):
self._connection.write_data(data)
def _describe(self):
return {"type": "LocalConnectionDataset"}See also: