tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Dataset wrapper that caches data in memory to avoid repeated I/O operations.
from kedro.io import CachedDatasetclass CachedDataset(AbstractDataset):
"""
Wrapper dataset that caches data in memory to avoid slow I/O.
Useful for expensive datasets that are loaded multiple times.
Note: CachedDataset is marked as single-process only (_SINGLE_PROCESS = True)
and cannot be used with ParallelRunner. Attempting to use CachedDataset with
ParallelRunner will raise an AttributeError during catalog validation because
the dataset cannot be pickled for multiprocessing.
Solutions for parallel execution:
- Use ThreadRunner instead (shares memory, no pickling required)
- Remove caching for datasets used in parallel pipelines
- Use alternative caching strategies (external cache, persistent storage)
"""
_SINGLE_PROCESS: bool = True
_EPHEMERAL: bool = True
def __init__(
self,
dataset: AbstractDataset | dict,
version: Version | None = None,
copy_mode: str | None = None,
metadata: dict[str, Any] | None = None
):
"""
Initialize CachedDataset.
Parameters:
- dataset: Dataset to wrap with caching
Can be an AbstractDataset instance or a dict configuration
- version: Version specification for versioned datasets
Should be specified here, not in the wrapped dataset config
- copy_mode: How to copy cached data ('deepcopy', 'copy', 'assign', or None)
- None: Auto-detect based on data type (default)
- 'copy': Shallow copy (faster, for pandas/numpy)
- 'deepcopy': Deep copy (slowest but fully independent)
- 'assign': No copy (for Spark DataFrames)
- metadata: Arbitrary metadata dictionary
Raises:
- ValueError: If dataset is neither a dict nor an AbstractDataset instance
- ValueError: If 'versioned' is specified in wrapped dataset config
"""
def _load(self) -> Any:
"""
Load data with caching.
First load reads from wrapped dataset and caches result.
Subsequent loads return cached data.
Returns:
Cached data (possibly copied based on copy_mode)
"""
def _save(self, data: Any) -> None:
"""
Save data and update cache.
Parameters:
- data: Data to save
Note:
Updates cache with saved data for consistency
"""
def _release(self) -> None:
"""
Release cached data.
Clears the cache, forcing next load to read from wrapped dataset.
"""
def _describe(self) -> dict[str, Any]:
"""
Describe the cached dataset.
Returns:
Dictionary with wrapped dataset info and cache status
"""
def _exists(self) -> bool:
"""
Check if dataset exists (cache or wrapped dataset).
Returns:
True if either cache has data or wrapped dataset exists
"""First Load:
Subsequent Loads:
On Save:
On Release:
CachedDataset cannot be used with ParallelRunner because:
_SINGLE_PROCESS = TrueAlternative: Use ThreadRunner
from kedro.runner import ThreadRunner
# ThreadRunner works with CachedDataset
runner = ThreadRunner()
runner.run(pipeline, catalog)from kedro.io import CachedDataset
from kedro_datasets.pandas import CSVDataset
# Wrap expensive dataset with cache
expensive_dataset = CSVDataset("large_file.csv")
cached = CachedDataset(expensive_dataset)
# First load: reads from file (slow)
data1 = cached.load()
# Subsequent loads: returns cached data (fast)
data2 = cached.load() # Instant, no file I/O
data3 = cached.load() # Instant, no file I/Ofrom kedro.io import CachedDataset
# No copying (fastest, but mutable)
cached = CachedDataset(dataset, copy_mode=None)
data = cached.load()
data.append(1) # Modifies cached data!
# Deep copy (safest, prevents mutations)
cached = CachedDataset(dataset, copy_mode="deepcopy")
data = cached.load()
data.append(1) # Doesn't affect cache
# Shallow copy (balanced)
cached = CachedDataset(dataset, copy_mode="copy")from kedro.io import CachedDataset
# Create CachedDataset from dict config
cached = CachedDataset(
dataset={
"type": "pandas.CSVDataset",
"filepath": "data/large_file.csv"
},
copy_mode="copy"
)
# Dataset is lazily created from config
data = cached.load()from kedro.io import CachedDataset, Version
# Versioning must be specified in CachedDataset, not wrapped dataset
cached = CachedDataset(
dataset={
"type": "pandas.CSVDataset",
"filepath": "data/model.csv"
},
version=Version(load=None, save="2024-01-15T10.30.45.123Z"),
copy_mode="copy"
)
# WRONG: Don't specify versioned in wrapped dataset
# This will raise ValueError
cached = CachedDataset(
dataset={
"type": "pandas.CSVDataset",
"filepath": "data/model.csv",
"versioned": True # ERROR!
}
)# conf/base/catalog.yml
expensive_model:
type: pickle.PickleDataset
filepath: data/models/large_model.pkl
cached_model:
type: kedro.io.CachedDataset
dataset:
type: pickle.PickleDataset
filepath: data/models/large_model.pkl
copy_mode: deepcopy
# With versioning
versioned_cached:
type: kedro.io.CachedDataset
versioned: true # Specify here, not in dataset
dataset:
type: pandas.CSVDataset
filepath: data/results.csv
copy_mode: copyfrom kedro.io import CachedDataset
cached = CachedDataset(expensive_dataset)
# Load and cache
data = cached.load()
# Release cache when done
cached.release()
# Next load will re-read from wrapped dataset
data = cached.load()Good for:
Avoid when:
import time
from kedro.io import CachedDataset
# Without cache
dataset = ExpensiveDataset("data.parquet")
start = time.time()
for _ in range(10):
data = dataset.load() # 10x I/O operations
uncached_time = time.time() - start
# With cache
cached = CachedDataset(dataset)
start = time.time()
for _ in range(10):
data = cached.load() # 1 I/O + 9 memory reads
cached_time = time.time() - start
print(f"Speedup: {uncached_time / cached_time:.1f}x")See also: