tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
In-memory dataset for transient data storage.
from kedro.io import MemoryDatasetclass MemoryDataset(AbstractDataset):
"""
Dataset that stores data in memory.
MemoryDataset is marked as ephemeral (_EPHEMERAL = True), meaning it doesn't
persist data to disk. It's useful for intermediate results and temporary data
storage within a pipeline.
"""
_EPHEMERAL: bool = True
def __init__(
self,
data: Any = None,
copy_mode: str | None = None,
metadata: dict[str, Any] | None = None
):
"""
Initialize MemoryDataset.
Parameters:
- data: Initial data to store in memory
If not provided, dataset starts empty
- copy_mode: How to copy data on load/save operations
- "deepcopy": Deep copy (default for most types, safest but slowest)
- "copy": Shallow copy (for pandas DataFrames, NumPy arrays)
- "assign": No copy (for Spark DataFrames, Ibis tables)
- None: Auto-detect based on data type
- metadata: Arbitrary metadata dictionary
Ignored by Kedro but available for users and plugins
Note:
Copy mode is inferred automatically if not specified:
- pandas DataFrame, NumPy array → "copy"
- Spark DataFrame, Ibis Table → "assign"
- Other types → "deepcopy"
"""
def load(self) -> Any:
"""
Load data from memory.
Returns:
Stored data (copied according to copy_mode)
Raises:
- DatasetError: If no data has been saved yet
"""
def save(self, data: Any) -> None:
"""
Save data to memory.
Parameters:
- data: Data to store (copied according to copy_mode)
"""
def _describe(self) -> dict[str, Any]:
"""
Describe the dataset.
Returns:
Dictionary with data type if data exists, None otherwise
"""
def _exists(self) -> bool:
"""
Check if data has been saved.
Returns:
True if data exists in memory, False otherwise
"""
def exists(self) -> bool:
"""
Check if data has been saved (public method).
Returns:
True if data exists in memory, False otherwise
"""
def _release(self) -> None:
"""
Release data from memory.
Clears the stored data, freeing memory.
"""
@property
def metadata(self) -> dict[str, Any]:
"""
Get dataset metadata.
Returns:
Metadata dictionary or None
"""MemoryDataset supports three copy modes to control how data is copied:
Creates a completely independent copy of the data, including all nested objects.
Characteristics:
Use when:
dataset = MemoryDataset([[1, 2], [3, 4]], copy_mode="deepcopy")
data1 = dataset.load()
data2 = dataset.load()
data1[0].append(99) # Doesn't affect data2
print(data1) # [[1, 2, 99], [3, 4]]
print(data2) # [[1, 2], [3, 4]]Creates a shallow copy - copies the container but shares nested objects.
Characteristics:
.copy() methodUse when:
import pandas as pd
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
dataset = MemoryDataset(df, copy_mode="copy")
data1 = dataset.load()
data2 = dataset.load()
data1["a"] = [99, 99] # Doesn't affect data2
print(data1["a"].tolist()) # [99, 99]
print(data2["a"].tolist()) # [1, 2]No copying - returns the same object reference.
Characteristics:
Use when:
# Spark DataFrames require assign mode
# (they are immutable, so sharing is safe)
dataset = MemoryDataset(spark_df, copy_mode="assign")
data1 = dataset.load()
data2 = dataset.load()
# data1 and data2 reference the same Spark DataFrameIf copy_mode is None, it's inferred based on data type:
import pandas as pd
import numpy as np
# Automatically uses "copy"
df = pd.DataFrame({"a": [1, 2]})
dataset1 = MemoryDataset(df)
# Automatically uses "copy"
arr = np.array([1, 2, 3])
dataset2 = MemoryDataset(arr)
# Automatically uses "deepcopy"
nested_list = [[1, 2], [3, 4]]
dataset3 = MemoryDataset(nested_list)
# Automatically uses "assign"
# (if spark_df is a Spark DataFrame)
dataset4 = MemoryDataset(spark_df)from kedro.io import MemoryDataset
# Create empty memory dataset
temp_data = MemoryDataset()
# Check if data exists
print(temp_data.exists()) # False
# Save data
temp_data.save([1, 2, 3, 4, 5])
# Check again
print(temp_data.exists()) # True
# Load data
data = temp_data.load()
print(data) # [1, 2, 3, 4, 5]
# Release data
temp_data.release()
print(temp_data.exists()) # Falsefrom kedro.io import MemoryDataset
# Create with initial data
config = MemoryDataset({"model": "xgboost", "params": {"max_depth": 5}})
# Data is immediately available
data = config.load()
print(data) # {"model": "xgboost", "params": {"max_depth": 5}}
# Update data
config.save({"model": "lightgbm", "params": {"num_leaves": 31}})from kedro.io import MemoryDataset
# Create with metadata
dataset = MemoryDataset(
data=[1, 2, 3],
metadata={"source": "test", "created": "2024-01-15"}
)
# Access metadata
print(dataset.metadata) # {"source": "test", "created": "2024-01-15"}from kedro.io import DataCatalog, MemoryDataset
# Create catalog with memory datasets
catalog = DataCatalog({
"input": MemoryDataset([1, 2, 3, 4, 5]),
"output": MemoryDataset()
})
# Load input
data = catalog.load("input")
print(data) # [1, 2, 3, 4, 5]
# Process and save
processed = [x * 2 for x in data]
catalog.save("output", processed)
# Load result
result = catalog.load("output")
print(result) # [2, 4, 6, 8, 10]# conf/base/catalog.yml
temp_data:
type: kedro.io.MemoryDataset
params_data:
type: kedro.io.MemoryDataset
data: {"learning_rate": 0.01, "epochs": 100}
copy_mode: deepcopy
safe_intermediate:
type: kedro.io.MemoryDataset
copy_mode: deepcopy
metadata:
description: "Intermediate results with safe copying"from kedro.io import DataCatalog, MemoryDataset
# Create catalog
catalog = DataCatalog()
# Add memory datasets using dict-like assignment
feed_dict = {
"input_data": [1, 2, 3],
"config": {"model": "xgboost"},
"params": {"max_depth": 5}
}
for name, data in feed_dict.items():
catalog[name] = MemoryDataset(data)
# Load the datasets
data = catalog.load("input_data")
config = catalog.load("config")
params = catalog.load("params")import pandas as pd
from kedro.io import MemoryDataset
# Large DataFrame - use copy mode for efficiency
large_df = pd.DataFrame({"col": range(1000000)})
# Efficient: shallow copy
efficient = MemoryDataset(large_df, copy_mode="copy")
# Inefficient: deep copy (much slower for DataFrames)
inefficient = MemoryDataset(large_df, copy_mode="deepcopy")
# Test loading
import time
start = time.time()
data1 = efficient.load()
print(f"Copy mode: {time.time() - start:.4f}s")
start = time.time()
data2 = inefficient.load()
print(f"Deepcopy mode: {time.time() - start:.4f}s")See also: