or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

datasets

abstract.mdcached.mdmemory.mdshared-memory.mdversioned.md
configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

abstract.mddocs/api/datasets/

AbstractDataset API Reference

Base class for implementing custom dataset types.

Module Import

from kedro.io import AbstractDataset

AbstractDataset Class

class AbstractDataset:
    """Base class for all dataset implementations."""

    _EPHEMERAL: bool = False       # True for non-persistent datasets
    _SINGLE_PROCESS: bool = False  # True if cannot be used with ParallelRunner

    @classmethod
    def from_config(
        cls,
        name: str,
        config: dict[str, Any],
        load_version: str | None = None,
        save_version: str | None = None
    ) -> AbstractDataset:
        """
        Create a dataset instance from configuration.

        Parameters:
        - name: Dataset name
        - config: Dataset configuration dictionary (must contain 'type' key)
        - load_version: Version string for load operation (versioned datasets only)
        - save_version: Version string for save operation (versioned datasets only)

        Returns:
        Instance of AbstractDataset subclass

        Raises:
        - DatasetError: When dataset creation fails
        """

    def load(self) -> Any:
        """
        Load data from the dataset.

        Returns:
        Loaded data
        """

    def save(self, data: Any) -> None:
        """
        Save data to the dataset.

        Parameters:
        - data: Data to save
        """

    def _load(self) -> Any:
        """
        Load data from the dataset.
        Must be implemented by subclasses.
        """

    def _save(self, data: Any) -> None:
        """
        Save data to the dataset.
        Must be implemented by subclasses.
        """

    def _describe(self) -> dict[str, Any]:
        """
        Describe the dataset.
        **Must** be implemented by subclasses (abstract method).

        Returns:
        Dictionary containing dataset description

        Raises:
        - NotImplementedError: If not implemented by subclass
        """

    def _exists(self) -> bool:
        """
        Check if dataset exists.
        Should be implemented by subclasses.

        Returns:
        True if dataset exists, False otherwise
        """

    def exists(self) -> bool:
        """
        Check if dataset's output already exists.

        Returns:
        True if dataset exists, False otherwise

        Raises:
        - DatasetError: When exists check fails
        """

    def release(self) -> None:
        """
        Release any cached data.

        Raises:
        - DatasetError: When release operation fails
        """

    def _release(self) -> None:
        """
        Release any cached data (internal implementation).
        Can be overridden by subclasses to implement custom release logic.
        """

    def _copy(self, **overwrite_params: Any) -> AbstractDataset:
        """
        Create a copy of this dataset with optional parameter overrides.

        Parameters:
        - **overwrite_params: Parameters to override in the copy

        Returns:
        Deep copy of the dataset with overridden parameters
        """

Class Attributes

class AbstractDataset:
    _EPHEMERAL: bool = False
    """
    Mark dataset as ephemeral (non-persistent).

    Ephemeral datasets like MemoryDataset don't persist data to disk.
    Set to True for in-memory or temporary datasets.
    Default: False
    """

    _SINGLE_PROCESS: bool = False
    """
    Mark dataset as single-process only.

    Datasets that cannot be used with ParallelRunner should set this to True.
    Examples include datasets using non-serializable resources or local connections.
    Default: False
    """

Dataset Lifecycle

Datasets follow a standard lifecycle with hooks at each stage:

1. Initialization

  • __init__() - Constructor receives configuration parameters
  • Arguments are captured automatically for _init_config()

2. Loading

  • load() - Public method with logging and error handling
  • _load() - Internal implementation (must be implemented by subclasses)
  • Automatically wrapped with error handling via _load_wrapper()

3. Saving

  • save(data) - Public method with validation and error handling
  • _save(data) - Internal implementation (must be implemented by subclasses)
  • Automatically wrapped with error handling via _save_wrapper()
  • Validates that data is not None

4. Existence Check

  • exists() - Public method with error handling
  • _exists() - Internal implementation (should be implemented by subclasses)
  • Returns False by default with warning

5. Release

  • release() - Public method to free cached data
  • _release() - Internal implementation (optional override)
  • Called to clean up resources

6. Description

  • _describe() - Returns dictionary describing dataset configuration
  • Used for string representation via __repr__()
  • Should include relevant initialization parameters

Creating Custom Datasets

from kedro.io import AbstractDataset
import json

class JSONDataset(AbstractDataset):
    """Custom dataset for JSON files."""

    def __init__(self, filepath: str):
        self._filepath = filepath

    def _load(self) -> dict:
        with open(self._filepath, 'r') as f:
            return json.load(f)

    def _save(self, data: dict) -> None:
        with open(self._filepath, 'w') as f:
            json.dump(data, f, indent=2)

    def _describe(self) -> dict:
        return {
            "filepath": self._filepath,
            "type": "JSONDataset"
        }

    def _exists(self) -> bool:
        from pathlib import Path
        return Path(self._filepath).exists()

Using from_config

from kedro.io import AbstractDataset

# Create dataset from configuration
config = {
    "type": "pandas.CSVDataset",
    "filepath": "data/raw/input.csv",
    "load_args": {"sep": ","}
}

dataset = AbstractDataset.from_config(
    name="input_data",
    config=config,
    load_version="2024-01-15T10.30.45.123Z",  # Optional
    save_version="2024-01-15T11.00.00.000Z"   # Optional
)

# Load data
data = dataset.load()

Copying Datasets

from kedro_datasets.pandas import CSVDataset

# Original dataset
original = CSVDataset(filepath="data/input.csv")

# Create copy with modified parameters
copy_dataset = original._copy(_filepath="data/output.csv")

# Both datasets work independently
data = original.load()
copy_dataset.save(data)

Marking Datasets as Ephemeral

from kedro.io import AbstractDataset

class TempDataset(AbstractDataset):
    """Dataset that doesn't persist data."""

    _EPHEMERAL = True  # Mark as non-persistent

    def __init__(self):
        self._data = None

    def _load(self):
        return self._data

    def _save(self, data):
        self._data = data

    def _describe(self):
        return {"type": "TempDataset"}

Single-Process Datasets

from kedro.io import AbstractDataset

class LocalConnectionDataset(AbstractDataset):
    """Dataset using non-serializable local resources."""

    _SINGLE_PROCESS = True  # Cannot be used with ParallelRunner

    def __init__(self, connection):
        self._connection = connection  # Non-serializable

    def _load(self):
        return self._connection.fetch_data()

    def _save(self, data):
        self._connection.write_data(data)

    def _describe(self):
        return {"type": "LocalConnectionDataset"}

See also:

  • DataCatalog API - Use datasets
  • Working with Data Guide - Custom dataset patterns