tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Central registry for all datasets with lazy loading, versioning, and uniform I/O interface.
from kedro.io import DataCatalog, SharedMemoryDataCatalogclass DataCatalog:
"""Central registry for all datasets in a Kedro project."""
def __init__(
self,
datasets: dict[str, AbstractDataset] | None = None,
config_resolver: CatalogConfigResolver | None = None,
load_versions: dict[str, str] | None = None,
save_version: str | None = None
):
"""
Initialize DataCatalog.
Parameters:
- datasets: Dictionary mapping dataset names to AbstractDataset instances
These datasets are immediately available in the catalog
- config_resolver: Configuration resolver for dataset factory patterns and credentials
Enables dynamic dataset creation based on naming patterns
If None, uses default resolver with MemoryDataset pattern
- load_versions: Specific versions to load for versioned datasets
Maps dataset name → version string (e.g., "2024-01-15T10.30.45.123Z")
Only affects datasets with versioning enabled
- save_version: Global version string for all save operations
Applied to all versioned datasets when saving
Must be filesystem-compatible and lexicographically sortable
Note:
- If a dataset appears in both 'datasets' and config_resolver,
the one in 'datasets' takes precedence
- Config resolver enables lazy loading - datasets are only created when accessed
"""
@property
def config_resolver(self) -> CatalogConfigResolver:
"""
Get the CatalogConfigResolver instance.
The config resolver handles:
- Dataset factory patterns (e.g., "{namespace}.*" → specific dataset type)
- Dynamic dataset creation based on name patterns
- Credential resolution and injection
- Default runtime patterns (e.g., "{default}" → MemoryDataset)
Returns:
CatalogConfigResolver instance managing dynamic dataset creation
"""
def load(self, name: str) -> Any:
"""Load data from a dataset."""
def save(self, name: str, data: Any) -> None:
"""Save data to a dataset."""
def exists(self, name: str) -> bool:
"""Check if a dataset exists in the catalog."""
def release(self, name: str) -> None:
"""Release a dataset's cached data."""
def confirm(self, name: str) -> None:
"""
Confirm a dataset by its name.
Note: This method requires the dataset to have a confirm() method.
If the dataset does not have a confirm() method, a DatasetError will be raised.
Typically used with versioned datasets that support confirmation.
Raises:
- DatasetNotFoundError: When dataset is not registered
- DatasetError: When dataset does not have 'confirm' method
"""
def filter(
self,
name_regex: str | None = None,
type_regex: str | None = None,
by_type: type | list[type] | None = None
) -> list[str]:
"""
Filter and list dataset names by regex pattern or type.
Parameters:
- name_regex: Optional regex pattern to filter by dataset name (Python re module syntax)
Examples: "^raw_.*" (starts with raw_), ".*_output$" (ends with _output)
- type_regex: Optional regex pattern to filter by dataset type's full class name
Example: ".*CSVDataset$" matches "pandas.CSVDataset"
- by_type: Optional dataset type or list of types to filter by
Matches exact type (does NOT include subclasses)
Example: MemoryDataset or [MemoryDataset, CSVDataset]
Returns:
List of dataset names matching ALL specified filter criteria (AND logic)
Examples:
>>> # Filter by name pattern
>>> catalog.filter(name_regex="^raw_") # ['raw_users', 'raw_products']
>>>
>>> # Filter by type
>>> from kedro.io import MemoryDataset
>>> catalog.filter(by_type=MemoryDataset) # All MemoryDatasets
>>>
>>> # Combine filters
>>> catalog.filter(name_regex="^feature_", by_type=MemoryDataset)
>>> # ['feature_engineered'] - only MemoryDatasets starting with 'feature_'
>>>
>>> # Filter by type string pattern
>>> catalog.filter(type_regex="CSV") # All CSV-related datasets
Performance Note:
- For large catalogs (1000+ datasets), filtering can take several milliseconds
- name_regex is fastest, type-based filtering requires type inspection
"""
def __contains__(self, dataset_name: str) -> bool:
"""Check if dataset is registered or matches pattern."""
def __getitem__(self, ds_name: str) -> AbstractDataset:
"""Get dataset by name (dict-like access)."""
def __setitem__(self, key: str, value: AbstractDataset) -> None:
"""Set dataset by name (dict-like assignment)."""
def __len__(self) -> int:
"""
Get the number of datasets registered in the catalog.
Returns:
Number of datasets (both materialized and lazy pattern-based)
"""
def __iter__(self) -> Iterator[str]:
"""
Iterate over all dataset names in the catalog.
Yields:
Dataset names (both materialized and lazy pattern-based)
"""
def get(
self,
key: str,
fallback_to_runtime_pattern: bool = False,
version: Version | None = None
) -> AbstractDataset | None:
"""
Get a dataset by name from the catalog.
If a dataset is not materialized but matches a pattern, it is
instantiated and added to the catalog first, then returned.
Parameters:
- key: Dataset name
- fallback_to_runtime_pattern: If True, use runtime patterns for matching
- version: Optional version for versioned datasets
Returns:
AbstractDataset instance or None if not found
"""
def keys(self) -> list[str]:
"""Get all dataset names."""
def values(self) -> list[AbstractDataset]:
"""Get all dataset instances."""
def items(self) -> list[tuple[str, AbstractDataset]]:
"""Get all dataset name-instance pairs."""
def get_type(self, ds_name: str) -> str | None:
"""
Get dataset type without materializing the dataset.
Parameters:
- ds_name: Dataset name
Returns:
Fully qualified type string (e.g., 'kedro.io.MemoryDataset') or None
"""
def to_config(self) -> tuple[
dict[str, dict[str, Any]],
dict[str, dict[str, Any]],
dict[str, str | None],
str | None
]:
"""
Convert DataCatalog instance to configuration format.
Returns:
Tuple containing four elements:
1. catalog_config: Dataset configurations (name -> config dict)
2. credentials: Credentials for datasets (name -> credentials dict)
3. load_versions: Load versions for datasets (name -> version string)
4. save_version: Global save version string (or None)
"""
@classmethod
def from_config(
cls,
catalog: dict[str, dict[str, Any]] | None,
credentials: dict[str, dict[str, Any]] | None = None,
load_versions: dict[str, str] | None = None,
save_version: str | None = None
) -> "DataCatalog":
"""Create DataCatalog from configuration dictionary."""
def __eq__(self, other: Any) -> bool:
"""Check equality with another catalog (compares datasets and configurations)."""
def __repr__() -> str:
"""String representation showing number of datasets and patterns."""class SharedMemoryDataCatalog(DataCatalog):
"""
Specialized DataCatalog for multiprocessing with shared memory support.
Extends DataCatalog to support parallel execution by ensuring datasets
are serializable and synchronized across processes. Used internally by
ParallelRunner for multiprocessing execution.
"""
def set_manager_datasets(self, manager: SyncManager) -> None:
"""
Associate multiprocessing manager with shared memory datasets.
Iterates through all datasets and sets the provided multiprocessing
manager for SharedMemoryDataset instances.
Parameters:
- manager: Multiprocessing SyncManager for process synchronization
"""
def validate_catalog(self) -> None:
"""
Validate that all datasets are compatible with multiprocessing.
Checks that all datasets are serializable and don't use single-process
features (e.g., lambda functions, nested functions, closures).
Raises:
- AttributeError: If datasets are not serializable or incompatible with multiprocessing
"""class DatasetError(Exception):
"""Base exception for dataset errors."""
class DatasetNotFoundError(DatasetError):
"""Raised when trying to use a non-existing dataset."""
class DatasetAlreadyExistsError(DatasetError):
"""Raised when trying to add a dataset that already exists."""class Version:
"""Named tuple for dataset versioning."""
load: str # Version to load (or None for latest)
save: str # Version to save (or None for timestamp)
def generate_timestamp() -> str:
"""Generate timestamp string for versioned datasets."""The config_resolver parameter enables advanced dataset management through patterns:
DataCatalog includes a default pattern for non-registered datasets:
default_runtime_patterns = {
"{default}": {"type": "kedro.io.MemoryDataset"}
}This means any dataset name not explicitly registered automatically becomes a MemoryDataset.
from kedro.io import DataCatalog
from kedro.io.catalog_config_resolver import CatalogConfigResolver
# Define custom patterns
patterns = {
"{default}": {"type": "kedro.io.MemoryDataset"},
"raw_*": {
"type": "pandas.CSVDataset",
"filepath": "data/01_raw/{name}.csv"
},
"processed_*": {
"type": "pandas.ParquetDataset",
"filepath": "data/02_processed/{name}.parquet"
}
}
# Create resolver
resolver = CatalogConfigResolver(
config={}, # Explicit dataset configs
patterns=patterns
)
# Create catalog with resolver
catalog = DataCatalog(config_resolver=resolver)
# These datasets are created dynamically from patterns
# raw_customers → pandas.CSVDataset("data/01_raw/raw_customers.csv")
# processed_orders → pandas.ParquetDataset("data/02_processed/processed_orders.parquet")Patterns are matched in order of specificity:
{default}# Pattern priority example
patterns = {
"model_*": {"type": "pickle.PickleDataset", "filepath": "models/{name}.pkl"},
"model_final_*": {"type": "kedro.io.CachedDataset", "dataset": {...}},
"{default}": {"type": "kedro.io.MemoryDataset"}
}
# Matching:
# "model_final_v2" → Uses "model_final_*" (most specific)
# "model_v1" → Uses "model_*"
# "random_data" → Uses "{default}"from kedro.io import DataCatalog
from kedro.io.catalog_config_resolver import CatalogConfigResolver
# Catalog config with credentials reference
catalog_config = {
"my_dataset": {
"type": "pandas.CSVDataset",
"filepath": "s3://bucket/data.csv",
"credentials": "s3_credentials" # Reference to credentials
}
}
# Credentials
credentials = {
"s3_credentials": {
"aws_access_key_id": "AKIAIOSFODNN7EXAMPLE",
"aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}
}
# Create resolver with credentials
resolver = CatalogConfigResolver(
config=catalog_config,
credentials=credentials
)
catalog = DataCatalog(config_resolver=resolver)
# Dataset receives resolved credentials automatically
data = catalog.load("my_dataset")from kedro.io import DataCatalog, MemoryDataset
# Create catalog
catalog = DataCatalog({
"input": MemoryDataset([1, 2, 3]),
"output": MemoryDataset()
})
# Load data
data = catalog.load("input")
print(data) # [1, 2, 3]
# Save data
catalog.save("output", [2, 4, 6])
# Check existence
if catalog.exists("input"):
print("Dataset exists") # True
if "output" in catalog:
print("Output in catalog") # True
# List datasets (filter with no arguments lists all)
all_datasets = catalog.filter()
print(all_datasets) # ['input', 'output']
# Filter with regex
params_only = catalog.filter(name_regex=r"^params:")
model_datasets = catalog.filter(name_regex=r"model_.*")
# Dict-like access
dataset = catalog["input"]
data = dataset.load()
# Iterate over catalog
for name in catalog:
print(f"Dataset: {name}")
# Get keys, values, items
names = catalog.keys()
datasets = catalog.values()
pairs = catalog.items()from kedro.io import DataCatalog
# Create empty catalog
catalog = DataCatalog()
# Add multiple datasets using dict-like assignment with MemoryDatasets
feed_dict = {
"input_data": [1, 2, 3, 4, 5],
"config": {"learning_rate": 0.01},
"params:model": {"max_depth": 5}
}
for name, data in feed_dict.items():
catalog[name] = MemoryDataset(data)
# Load the datasets
data = catalog.load("input_data")
config = catalog.load("config")
params = catalog.load("params:model")
# Replace an existing dataset
catalog["input_data"] = MemoryDataset([10, 20, 30])
# Or check first if you don't want to replace
if "input_data" not in catalog:
catalog["input_data"] = MemoryDataset([100, 200])from kedro.io import DataCatalog, CachedDataset
from kedro_datasets.pandas import CSVDataset
# Create catalog with cached dataset
catalog = DataCatalog({
"cached_data": CachedDataset(
CSVDataset(filepath="data/large_file.csv")
)
})
# Load data (caches in memory)
data = catalog.load("cached_data")
# Release cached data (frees memory)
catalog.release("cached_data")
# Confirm dataset exists (raises error if not found)
try:
catalog.confirm("cached_data") # OK
catalog.confirm("non_existent") # Raises DatasetNotFoundError
except DatasetNotFoundError as e:
print(f"Error: {e}")from kedro.io import DataCatalog
# Define catalog configuration
catalog_config = {
"raw_data": {
"type": "pandas.CSVDataset",
"filepath": "data/raw/input.csv",
"load_args": {"sep": ","},
"save_args": {"index": False}
},
"processed_data": {
"type": "pandas.ParquetDataset",
"filepath": "data/processed/output.parquet"
},
"model": {
"type": "pickle.PickleDataset",
"filepath": "data/models/model.pkl",
"versioned": True # Enable versioning
}
}
# Create catalog
catalog = DataCatalog.from_config(catalog_config)
# Load data
raw = catalog.load("raw_data")from kedro.io import DataCatalog
# Catalog config with credentials references
catalog_config = {
"s3_data": {
"type": "pandas.CSVDataset",
"filepath": "s3://my-bucket/data.csv",
"credentials": "s3_creds"
},
"postgres_table": {
"type": "pandas.SQLTableDataset",
"table_name": "users",
"credentials": "postgres_creds"
}
}
# Credentials (separate from catalog config)
credentials = {
"s3_creds": {
"aws_access_key_id": "AKIAIOSFODNN7EXAMPLE",
"aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
},
"postgres_creds": {
"con": "postgresql://user:pass@localhost:5432/db"
}
}
# Create catalog with credentials
catalog = DataCatalog.from_config(
catalog=catalog_config,
credentials=credentials
)
# Credentials are automatically injected
data = catalog.load("s3_data")from kedro.io import DataCatalog
catalog_config = {
"model": {
"type": "pickle.PickleDataset",
"filepath": "data/models/model.pkl",
"versioned": True
},
"results": {
"type": "pandas.CSVDataset",
"filepath": "data/results.csv",
"versioned": True
}
}
# Load specific versions
catalog = DataCatalog.from_config(
catalog=catalog_config,
load_versions={
"model": "2024-01-15T10.30.45.123Z",
"results": "2024-01-15T11.00.00.000Z"
},
save_version="2024-01-15T12.00.00.000Z"
)
# Loads specified versions
model = catalog.load("model") # Loads 2024-01-15T10.30.45.123Z
# Saves with specified version
catalog.save("model", new_model) # Saves as 2024-01-15T12.00.00.000Z# Load specific version
catalog = DataCatalog(
datasets={...},
load_versions={"model": "2024-01-15T10.30.45.123Z"}
)
# Save with version
catalog = DataCatalog(
datasets={...},
save_version="2024-01-15T11.00.00.000Z"
)from multiprocessing.managers import SyncManager
from kedro.io import SharedMemoryDataCatalog, MemoryDataset
from kedro.runner import ParallelRunner
# Create shared memory catalog
catalog = SharedMemoryDataCatalog({
"input_data": MemoryDataset([1, 2, 3, 4, 5]),
"output_data": MemoryDataset()
})
# Note: SharedMemoryDataCatalog is typically used internally by ParallelRunner
# ParallelRunner automatically converts DataCatalog to SharedMemoryDataCatalog
runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
# Access data using .load()
result = outputs["output_data"].load()
# For advanced use: Manual multiprocessing manager setup
manager = SyncManager()
manager.start()
catalog.set_manager_datasets(manager)
# Validate catalog is compatible with multiprocessing
try:
catalog.validate_catalog()
print("Catalog is valid for parallel execution")
except AttributeError as e:
print(f"Validation failed: {e}")See also: