tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Base class for datasets with version support.
from kedro.io import AbstractVersionedDataset
from pathlib import PurePosixPath
from typing import Callableclass AbstractVersionedDataset(AbstractDataset):
"""Base class for versioned dataset implementations."""
def __init__(
self,
filepath: PurePosixPath,
version: Version | None,
exists_function: Callable[[str], bool] | None = None,
glob_function: Callable[[str], list[str]] | None = None
):
"""
Initialize versioned dataset.
Parameters:
- filepath: Filepath in POSIX format to the dataset file
- version: Version specification (Version namedtuple with load/save attributes)
If None, versioning is disabled
If version.load is None, latest version will be loaded
If version.save is None, save version will be autogenerated
- exists_function: Custom function to check if path exists in filesystem.
Required for custom filesystems (S3, Azure, GCS, etc.).
Signature: (path: str) -> bool
Defaults to os.path.exists for local filesystem
Example for S3: lambda path: s3fs.S3FileSystem().exists(path)
- glob_function: Custom function to find paths matching pattern in filesystem.
Required for custom filesystems (S3, Azure, GCS, etc.).
Signature: (pattern: str) -> list[str]
Defaults to glob.iglob for local filesystem
Returns list of paths matching the version timestamp pattern
Example for S3: lambda pattern: s3fs.S3FileSystem().glob(pattern)
"""
def resolve_load_version(self) -> str | None:
"""
Compute the version the dataset should be loaded with.
Returns:
- Version string to load (e.g., "2024-01-15T10.30.45.123Z")
- None if versioning is disabled
"""
def resolve_save_version(self) -> str | None:
"""
Compute the version the dataset should be saved with.
Returns:
- Version string to save (e.g., "2024-01-15T11.00.00.000Z")
- None if versioning is disabled
"""
def _get_load_path(self) -> PurePosixPath:
"""
Get the path to load data from.
Returns:
- Versioned path if versioning is enabled (e.g., "data/file/2024-01-15T10.30.45.123Z/file.csv")
- Original filepath if versioning is disabled
"""
def _get_save_path(self) -> PurePosixPath:
"""
Get the path to save data to.
Returns:
- Versioned path if versioning is enabled (e.g., "data/file/2024-01-15T11.00.00.000Z/file.csv")
- Original filepath if versioning is disabled
Raises:
- DatasetError: If versioned save path already exists
"""
@property
def _version(self) -> Version | None:
"""
Get the version specification for this dataset.
Returns:
Version namedtuple or None if versioning is disabled
"""class Version:
"""Named tuple for dataset versioning."""
load: str | None # Version to load (or None for latest)
save: str | None # Version to save (or None for timestamp)Versioned datasets use a timestamp-based format:
VERSION_FORMAT = "%Y-%m-%dT%H.%M.%S.%fZ"Format explanation:
%Y-%m-%d - Date (e.g., 2024-01-15)T - Separator between date and time%H.%M.%S - Time with dots instead of colons (e.g., 10.30.45).%f - Microseconds (e.g., .123000)Z - UTC timezone indicatorExample version strings:
2024-01-15T10.30.45.123456Z2024-12-31T23.59.59.999999ZNote: Colons are replaced with dots to ensure filesystem compatibility across operating systems.
When versioning is enabled, datasets are stored in a versioned directory structure:
data/
└── model/
├── 2024-01-15T10.30.45.123Z/
│ └── model.pkl
├── 2024-01-15T11.00.00.000Z/
│ └── model.pkl
└── 2024-01-15T12.15.30.456Z/
└── model.pklStructure:
data/model/ (from filepath)Path pattern:
{filepath}/{version}/{filename}For example, if filepath = "data/model.pkl":
data/model.pkldata/model/2024-01-15T10.30.45.123Z/model.pklfrom pathlib import PurePosixPath
from kedro.io import AbstractVersionedDataset, Version
import pandas as pd
class VersionedCSVDataset(AbstractVersionedDataset):
"""Custom versioned CSV dataset."""
def __init__(self, filepath: str, version: Version | None = None):
super().__init__(
filepath=PurePosixPath(filepath),
version=version
)
def _load(self) -> pd.DataFrame:
load_path = self._get_load_path()
return pd.read_csv(load_path)
def _save(self, data: pd.DataFrame) -> None:
save_path = self._get_save_path()
save_path.parent.mkdir(parents=True, exist_ok=True)
data.to_csv(str(save_path), index=False)
def _exists(self) -> bool:
try:
path = self._get_load_path()
return path.exists()
except Exception:
return False
def _describe(self) -> dict:
return {
"filepath": str(self._filepath),
"version": self._version
}from kedro.io import DataCatalog, Version
# Load specific version
catalog = DataCatalog(
datasets={...},
load_versions={"model": "2024-01-15T10.30.45.123Z"}
)
# Load data (uses specified version)
model = catalog.load("model")
# Save with specific version
catalog = DataCatalog(
datasets={...},
save_version="2024-01-15T11.00.00.000Z"
)
# Save data (uses specified version)
catalog.save("model", trained_model)from kedro.io import Version
# Load latest version (version.load = None)
dataset = VersionedCSVDataset(
filepath="data/input.csv",
version=Version(load=None, save="2024-01-15T11.00.00.000Z")
)
# This loads the most recent version from disk
data = dataset.load()from kedro.io import Version
# Autogenerate save version (version.save = None)
dataset = VersionedCSVDataset(
filepath="data/output.csv",
version=Version(load=None, save=None)
)
# Save version is generated from current timestamp
dataset.save(data)
# Check what version was saved
save_version = dataset.resolve_save_version()
print(f"Saved as version: {save_version}")# conf/base/catalog.yml
my_dataset:
type: kedro_datasets.pandas.CSVDataset
filepath: data/01_raw/input.csv
versioned: true # Enable versioning
# Load specific version
my_dataset_with_version:
type: kedro_datasets.pandas.CSVDataset
filepath: data/01_raw/input.csv
versioned: true
version:
load: "2024-01-15T10.30.45.123Z"
save: "2024-01-15T11.00.00.000Z"See also: