or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

datasets

abstract.mdcached.mdmemory.mdshared-memory.mdversioned.md
configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

versioned.mddocs/api/datasets/

Versioned Dataset API Reference

Base class for datasets with version support.

Module Import

from kedro.io import AbstractVersionedDataset
from pathlib import PurePosixPath
from typing import Callable

AbstractVersionedDataset

class AbstractVersionedDataset(AbstractDataset):
    """Base class for versioned dataset implementations."""

    def __init__(
        self,
        filepath: PurePosixPath,
        version: Version | None,
        exists_function: Callable[[str], bool] | None = None,
        glob_function: Callable[[str], list[str]] | None = None
    ):
        """
        Initialize versioned dataset.

        Parameters:
        - filepath: Filepath in POSIX format to the dataset file
        - version: Version specification (Version namedtuple with load/save attributes)
                  If None, versioning is disabled
                  If version.load is None, latest version will be loaded
                  If version.save is None, save version will be autogenerated
        - exists_function: Custom function to check if path exists in filesystem.
                          Required for custom filesystems (S3, Azure, GCS, etc.).
                          Signature: (path: str) -> bool
                          Defaults to os.path.exists for local filesystem
                          Example for S3: lambda path: s3fs.S3FileSystem().exists(path)
        - glob_function: Custom function to find paths matching pattern in filesystem.
                        Required for custom filesystems (S3, Azure, GCS, etc.).
                        Signature: (pattern: str) -> list[str]
                        Defaults to glob.iglob for local filesystem
                        Returns list of paths matching the version timestamp pattern
                        Example for S3: lambda pattern: s3fs.S3FileSystem().glob(pattern)
        """

    def resolve_load_version(self) -> str | None:
        """
        Compute the version the dataset should be loaded with.

        Returns:
        - Version string to load (e.g., "2024-01-15T10.30.45.123Z")
        - None if versioning is disabled
        """

    def resolve_save_version(self) -> str | None:
        """
        Compute the version the dataset should be saved with.

        Returns:
        - Version string to save (e.g., "2024-01-15T11.00.00.000Z")
        - None if versioning is disabled
        """

    def _get_load_path(self) -> PurePosixPath:
        """
        Get the path to load data from.

        Returns:
        - Versioned path if versioning is enabled (e.g., "data/file/2024-01-15T10.30.45.123Z/file.csv")
        - Original filepath if versioning is disabled
        """

    def _get_save_path(self) -> PurePosixPath:
        """
        Get the path to save data to.

        Returns:
        - Versioned path if versioning is enabled (e.g., "data/file/2024-01-15T11.00.00.000Z/file.csv")
        - Original filepath if versioning is disabled

        Raises:
        - DatasetError: If versioned save path already exists
        """

    @property
    def _version(self) -> Version | None:
        """
        Get the version specification for this dataset.

        Returns:
        Version namedtuple or None if versioning is disabled
        """

Version Class

class Version:
    """Named tuple for dataset versioning."""
    load: str | None  # Version to load (or None for latest)
    save: str | None  # Version to save (or None for timestamp)

Version Format

Versioned datasets use a timestamp-based format:

VERSION_FORMAT = "%Y-%m-%dT%H.%M.%S.%fZ"

Format explanation:

  • %Y-%m-%d - Date (e.g., 2024-01-15)
  • T - Separator between date and time
  • %H.%M.%S - Time with dots instead of colons (e.g., 10.30.45)
  • .%f - Microseconds (e.g., .123000)
  • Z - UTC timezone indicator

Example version strings:

  • 2024-01-15T10.30.45.123456Z
  • 2024-12-31T23.59.59.999999Z

Note: Colons are replaced with dots to ensure filesystem compatibility across operating systems.

Versioned Directory Structure

When versioning is enabled, datasets are stored in a versioned directory structure:

data/
└── model/
    ├── 2024-01-15T10.30.45.123Z/
    │   └── model.pkl
    ├── 2024-01-15T11.00.00.000Z/
    │   └── model.pkl
    └── 2024-01-15T12.15.30.456Z/
        └── model.pkl

Structure:

  • Base path: data/model/ (from filepath)
  • Version directories: Each version gets its own directory
  • File: Original filename repeated in each version directory

Path pattern:

{filepath}/{version}/{filename}

For example, if filepath = "data/model.pkl":

  • Without versioning: data/model.pkl
  • With versioning: data/model/2024-01-15T10.30.45.123Z/model.pkl

Usage Examples

Creating a Versioned Dataset

from pathlib import PurePosixPath
from kedro.io import AbstractVersionedDataset, Version
import pandas as pd

class VersionedCSVDataset(AbstractVersionedDataset):
    """Custom versioned CSV dataset."""

    def __init__(self, filepath: str, version: Version | None = None):
        super().__init__(
            filepath=PurePosixPath(filepath),
            version=version
        )

    def _load(self) -> pd.DataFrame:
        load_path = self._get_load_path()
        return pd.read_csv(load_path)

    def _save(self, data: pd.DataFrame) -> None:
        save_path = self._get_save_path()
        save_path.parent.mkdir(parents=True, exist_ok=True)
        data.to_csv(str(save_path), index=False)

    def _exists(self) -> bool:
        try:
            path = self._get_load_path()
            return path.exists()
        except Exception:
            return False

    def _describe(self) -> dict:
        return {
            "filepath": str(self._filepath),
            "version": self._version
        }

Using Versions in DataCatalog

from kedro.io import DataCatalog, Version

# Load specific version
catalog = DataCatalog(
    datasets={...},
    load_versions={"model": "2024-01-15T10.30.45.123Z"}
)

# Load data (uses specified version)
model = catalog.load("model")

# Save with specific version
catalog = DataCatalog(
    datasets={...},
    save_version="2024-01-15T11.00.00.000Z"
)

# Save data (uses specified version)
catalog.save("model", trained_model)

Loading Latest Version

from kedro.io import Version

# Load latest version (version.load = None)
dataset = VersionedCSVDataset(
    filepath="data/input.csv",
    version=Version(load=None, save="2024-01-15T11.00.00.000Z")
)

# This loads the most recent version from disk
data = dataset.load()

Automatic Version Generation

from kedro.io import Version

# Autogenerate save version (version.save = None)
dataset = VersionedCSVDataset(
    filepath="data/output.csv",
    version=Version(load=None, save=None)
)

# Save version is generated from current timestamp
dataset.save(data)

# Check what version was saved
save_version = dataset.resolve_save_version()
print(f"Saved as version: {save_version}")

Versioning in catalog.yml

# conf/base/catalog.yml
my_dataset:
  type: kedro_datasets.pandas.CSVDataset
  filepath: data/01_raw/input.csv
  versioned: true  # Enable versioning

# Load specific version
my_dataset_with_version:
  type: kedro_datasets.pandas.CSVDataset
  filepath: data/01_raw/input.csv
  versioned: true
  version:
    load: "2024-01-15T10.30.45.123Z"
    save: "2024-01-15T11.00.00.000Z"

See also:

  • AbstractDataset API - Base dataset class
  • DataCatalog API - Version management