or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

datasets

abstract.mdcached.mdmemory.mdshared-memory.mdversioned.md
configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

cached.mddocs/api/datasets/

CachedDataset API Reference

Dataset wrapper that caches data in memory to avoid repeated I/O operations.

Module Import

from kedro.io import CachedDataset

CachedDataset Class

class CachedDataset(AbstractDataset):
    """
    Wrapper dataset that caches data in memory to avoid slow I/O.
    Useful for expensive datasets that are loaded multiple times.

    Note: CachedDataset is marked as single-process only (_SINGLE_PROCESS = True)
    and cannot be used with ParallelRunner. Attempting to use CachedDataset with
    ParallelRunner will raise an AttributeError during catalog validation because
    the dataset cannot be pickled for multiprocessing.

    Solutions for parallel execution:
    - Use ThreadRunner instead (shares memory, no pickling required)
    - Remove caching for datasets used in parallel pipelines
    - Use alternative caching strategies (external cache, persistent storage)
    """

    _SINGLE_PROCESS: bool = True
    _EPHEMERAL: bool = True

    def __init__(
        self,
        dataset: AbstractDataset | dict,
        version: Version | None = None,
        copy_mode: str | None = None,
        metadata: dict[str, Any] | None = None
    ):
        """
        Initialize CachedDataset.

        Parameters:
        - dataset: Dataset to wrap with caching
                  Can be an AbstractDataset instance or a dict configuration
        - version: Version specification for versioned datasets
                  Should be specified here, not in the wrapped dataset config
        - copy_mode: How to copy cached data ('deepcopy', 'copy', 'assign', or None)
                    - None: Auto-detect based on data type (default)
                    - 'copy': Shallow copy (faster, for pandas/numpy)
                    - 'deepcopy': Deep copy (slowest but fully independent)
                    - 'assign': No copy (for Spark DataFrames)
        - metadata: Arbitrary metadata dictionary

        Raises:
        - ValueError: If dataset is neither a dict nor an AbstractDataset instance
        - ValueError: If 'versioned' is specified in wrapped dataset config
        """

    def _load(self) -> Any:
        """
        Load data with caching.

        First load reads from wrapped dataset and caches result.
        Subsequent loads return cached data.

        Returns:
        Cached data (possibly copied based on copy_mode)
        """

    def _save(self, data: Any) -> None:
        """
        Save data and update cache.

        Parameters:
        - data: Data to save

        Note:
        Updates cache with saved data for consistency
        """

    def _release(self) -> None:
        """
        Release cached data.

        Clears the cache, forcing next load to read from wrapped dataset.
        """

    def _describe(self) -> dict[str, Any]:
        """
        Describe the cached dataset.

        Returns:
        Dictionary with wrapped dataset info and cache status
        """

    def _exists(self) -> bool:
        """
        Check if dataset exists (cache or wrapped dataset).

        Returns:
        True if either cache has data or wrapped dataset exists
        """

Cache Behavior

First Load:

  1. Check if cache has data
  2. If not, load from wrapped dataset
  3. Store result in cache
  4. Return data

Subsequent Loads:

  1. Check if cache has data
  2. If yes, return cached data (no I/O)
  3. Data is copied according to copy_mode

On Save:

  1. Save to wrapped dataset
  2. Update cache with saved data
  3. Both operations occur

On Release:

  1. Clear cache
  2. Release wrapped dataset
  3. Next load will re-read from wrapped dataset

Parallel Execution Limitations

CachedDataset cannot be used with ParallelRunner because:

  • Cache is not shared across processes
  • Each process would have its own cache
  • Marked with _SINGLE_PROCESS = True

Alternative: Use ThreadRunner

from kedro.runner import ThreadRunner

# ThreadRunner works with CachedDataset
runner = ThreadRunner()
runner.run(pipeline, catalog)

Usage Examples

Basic Caching

from kedro.io import CachedDataset
from kedro_datasets.pandas import CSVDataset

# Wrap expensive dataset with cache
expensive_dataset = CSVDataset("large_file.csv")
cached = CachedDataset(expensive_dataset)

# First load: reads from file (slow)
data1 = cached.load()

# Subsequent loads: returns cached data (fast)
data2 = cached.load()  # Instant, no file I/O
data3 = cached.load()  # Instant, no file I/O

With Copy Mode

from kedro.io import CachedDataset

# No copying (fastest, but mutable)
cached = CachedDataset(dataset, copy_mode=None)
data = cached.load()
data.append(1)  # Modifies cached data!

# Deep copy (safest, prevents mutations)
cached = CachedDataset(dataset, copy_mode="deepcopy")
data = cached.load()
data.append(1)  # Doesn't affect cache

# Shallow copy (balanced)
cached = CachedDataset(dataset, copy_mode="copy")

Creating from Dict Configuration

from kedro.io import CachedDataset

# Create CachedDataset from dict config
cached = CachedDataset(
    dataset={
        "type": "pandas.CSVDataset",
        "filepath": "data/large_file.csv"
    },
    copy_mode="copy"
)

# Dataset is lazily created from config
data = cached.load()

With Versioning

from kedro.io import CachedDataset, Version

# Versioning must be specified in CachedDataset, not wrapped dataset
cached = CachedDataset(
    dataset={
        "type": "pandas.CSVDataset",
        "filepath": "data/model.csv"
    },
    version=Version(load=None, save="2024-01-15T10.30.45.123Z"),
    copy_mode="copy"
)

# WRONG: Don't specify versioned in wrapped dataset
# This will raise ValueError
cached = CachedDataset(
    dataset={
        "type": "pandas.CSVDataset",
        "filepath": "data/model.csv",
        "versioned": True  # ERROR!
    }
)

In Data Catalog

# conf/base/catalog.yml
expensive_model:
  type: pickle.PickleDataset
  filepath: data/models/large_model.pkl

cached_model:
  type: kedro.io.CachedDataset
  dataset:
    type: pickle.PickleDataset
    filepath: data/models/large_model.pkl
  copy_mode: deepcopy

# With versioning
versioned_cached:
  type: kedro.io.CachedDataset
  versioned: true  # Specify here, not in dataset
  dataset:
    type: pandas.CSVDataset
    filepath: data/results.csv
  copy_mode: copy

Cache Management

from kedro.io import CachedDataset

cached = CachedDataset(expensive_dataset)

# Load and cache
data = cached.load()

# Release cache when done
cached.release()

# Next load will re-read from wrapped dataset
data = cached.load()

Use Cases

Good for:

  • Expensive I/O operations (large files, remote data)
  • Datasets loaded multiple times in a pipeline
  • Read-heavy workloads with infrequent updates

Avoid when:

  • Dataset is only loaded once
  • Memory is constrained
  • Data is too large to fit in memory
  • Data changes frequently

Performance Comparison

import time
from kedro.io import CachedDataset

# Without cache
dataset = ExpensiveDataset("data.parquet")

start = time.time()
for _ in range(10):
    data = dataset.load()  # 10x I/O operations
uncached_time = time.time() - start

# With cache
cached = CachedDataset(dataset)

start = time.time()
for _ in range(10):
    data = cached.load()  # 1 I/O + 9 memory reads
cached_time = time.time() - start

print(f"Speedup: {uncached_time / cached_time:.1f}x")

See also:

  • AbstractDataset API - Base dataset class
  • MemoryDataset API - In-memory storage
  • Working with Data Guide - Caching patterns