or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

data-catalog.mddocs/api/

DataCatalog API Reference

Central registry for all datasets with lazy loading, versioning, and uniform I/O interface.

Imports

from kedro.io import DataCatalog, SharedMemoryDataCatalog

DataCatalog Class

class DataCatalog:
    """Central registry for all datasets in a Kedro project."""

    def __init__(
        self,
        datasets: dict[str, AbstractDataset] | None = None,
        config_resolver: CatalogConfigResolver | None = None,
        load_versions: dict[str, str] | None = None,
        save_version: str | None = None
    ):
        """
        Initialize DataCatalog.

        Parameters:
        - datasets: Dictionary mapping dataset names to AbstractDataset instances
                   These datasets are immediately available in the catalog
        - config_resolver: Configuration resolver for dataset factory patterns and credentials
                          Enables dynamic dataset creation based on naming patterns
                          If None, uses default resolver with MemoryDataset pattern
        - load_versions: Specific versions to load for versioned datasets
                        Maps dataset name → version string (e.g., "2024-01-15T10.30.45.123Z")
                        Only affects datasets with versioning enabled
        - save_version: Global version string for all save operations
                       Applied to all versioned datasets when saving
                       Must be filesystem-compatible and lexicographically sortable

        Note:
        - If a dataset appears in both 'datasets' and config_resolver,
          the one in 'datasets' takes precedence
        - Config resolver enables lazy loading - datasets are only created when accessed
        """

    @property
    def config_resolver(self) -> CatalogConfigResolver:
        """
        Get the CatalogConfigResolver instance.

        The config resolver handles:
        - Dataset factory patterns (e.g., "{namespace}.*" → specific dataset type)
        - Dynamic dataset creation based on name patterns
        - Credential resolution and injection
        - Default runtime patterns (e.g., "{default}" → MemoryDataset)

        Returns:
        CatalogConfigResolver instance managing dynamic dataset creation
        """

    def load(self, name: str) -> Any:
        """Load data from a dataset."""

    def save(self, name: str, data: Any) -> None:
        """Save data to a dataset."""

    def exists(self, name: str) -> bool:
        """Check if a dataset exists in the catalog."""

    def release(self, name: str) -> None:
        """Release a dataset's cached data."""

    def confirm(self, name: str) -> None:
        """
        Confirm a dataset by its name.

        Note: This method requires the dataset to have a confirm() method.
        If the dataset does not have a confirm() method, a DatasetError will be raised.
        Typically used with versioned datasets that support confirmation.

        Raises:
        - DatasetNotFoundError: When dataset is not registered
        - DatasetError: When dataset does not have 'confirm' method
        """

    def filter(
        self,
        name_regex: str | None = None,
        type_regex: str | None = None,
        by_type: type | list[type] | None = None
    ) -> list[str]:
        """
        Filter and list dataset names by regex pattern or type.

        Parameters:
        - name_regex: Optional regex pattern to filter by dataset name (Python re module syntax)
                     Examples: "^raw_.*" (starts with raw_), ".*_output$" (ends with _output)
        - type_regex: Optional regex pattern to filter by dataset type's full class name
                     Example: ".*CSVDataset$" matches "pandas.CSVDataset"
        - by_type: Optional dataset type or list of types to filter by
                  Matches exact type (does NOT include subclasses)
                  Example: MemoryDataset or [MemoryDataset, CSVDataset]

        Returns:
        List of dataset names matching ALL specified filter criteria (AND logic)

        Examples:
        >>> # Filter by name pattern
        >>> catalog.filter(name_regex="^raw_")  # ['raw_users', 'raw_products']
        >>>
        >>> # Filter by type
        >>> from kedro.io import MemoryDataset
        >>> catalog.filter(by_type=MemoryDataset)  # All MemoryDatasets
        >>>
        >>> # Combine filters
        >>> catalog.filter(name_regex="^feature_", by_type=MemoryDataset)
        >>> # ['feature_engineered'] - only MemoryDatasets starting with 'feature_'
        >>>
        >>> # Filter by type string pattern
        >>> catalog.filter(type_regex="CSV")  # All CSV-related datasets

        Performance Note:
        - For large catalogs (1000+ datasets), filtering can take several milliseconds
        - name_regex is fastest, type-based filtering requires type inspection
        """

    def __contains__(self, dataset_name: str) -> bool:
        """Check if dataset is registered or matches pattern."""

    def __getitem__(self, ds_name: str) -> AbstractDataset:
        """Get dataset by name (dict-like access)."""

    def __setitem__(self, key: str, value: AbstractDataset) -> None:
        """Set dataset by name (dict-like assignment)."""

    def __len__(self) -> int:
        """
        Get the number of datasets registered in the catalog.

        Returns:
        Number of datasets (both materialized and lazy pattern-based)
        """

    def __iter__(self) -> Iterator[str]:
        """
        Iterate over all dataset names in the catalog.

        Yields:
        Dataset names (both materialized and lazy pattern-based)
        """

    def get(
        self,
        key: str,
        fallback_to_runtime_pattern: bool = False,
        version: Version | None = None
    ) -> AbstractDataset | None:
        """
        Get a dataset by name from the catalog.

        If a dataset is not materialized but matches a pattern, it is
        instantiated and added to the catalog first, then returned.

        Parameters:
        - key: Dataset name
        - fallback_to_runtime_pattern: If True, use runtime patterns for matching
        - version: Optional version for versioned datasets

        Returns:
        AbstractDataset instance or None if not found
        """

    def keys(self) -> list[str]:
        """Get all dataset names."""

    def values(self) -> list[AbstractDataset]:
        """Get all dataset instances."""

    def items(self) -> list[tuple[str, AbstractDataset]]:
        """Get all dataset name-instance pairs."""

    def get_type(self, ds_name: str) -> str | None:
        """
        Get dataset type without materializing the dataset.

        Parameters:
        - ds_name: Dataset name

        Returns:
        Fully qualified type string (e.g., 'kedro.io.MemoryDataset') or None
        """

    def to_config(self) -> tuple[
        dict[str, dict[str, Any]],
        dict[str, dict[str, Any]],
        dict[str, str | None],
        str | None
    ]:
        """
        Convert DataCatalog instance to configuration format.

        Returns:
        Tuple containing four elements:
        1. catalog_config: Dataset configurations (name -> config dict)
        2. credentials: Credentials for datasets (name -> credentials dict)
        3. load_versions: Load versions for datasets (name -> version string)
        4. save_version: Global save version string (or None)
        """

    @classmethod
    def from_config(
        cls,
        catalog: dict[str, dict[str, Any]] | None,
        credentials: dict[str, dict[str, Any]] | None = None,
        load_versions: dict[str, str] | None = None,
        save_version: str | None = None
    ) -> "DataCatalog":
        """Create DataCatalog from configuration dictionary."""

    def __eq__(self, other: Any) -> bool:
        """Check equality with another catalog (compares datasets and configurations)."""

    def __repr__() -> str:
        """String representation showing number of datasets and patterns."""

SharedMemoryDataCatalog Class

class SharedMemoryDataCatalog(DataCatalog):
    """
    Specialized DataCatalog for multiprocessing with shared memory support.

    Extends DataCatalog to support parallel execution by ensuring datasets
    are serializable and synchronized across processes. Used internally by
    ParallelRunner for multiprocessing execution.
    """

    def set_manager_datasets(self, manager: SyncManager) -> None:
        """
        Associate multiprocessing manager with shared memory datasets.

        Iterates through all datasets and sets the provided multiprocessing
        manager for SharedMemoryDataset instances.

        Parameters:
        - manager: Multiprocessing SyncManager for process synchronization
        """

    def validate_catalog(self) -> None:
        """
        Validate that all datasets are compatible with multiprocessing.

        Checks that all datasets are serializable and don't use single-process
        features (e.g., lambda functions, nested functions, closures).

        Raises:
        - AttributeError: If datasets are not serializable or incompatible with multiprocessing
        """

Exceptions

class DatasetError(Exception):
    """Base exception for dataset errors."""

class DatasetNotFoundError(DatasetError):
    """Raised when trying to use a non-existing dataset."""

class DatasetAlreadyExistsError(DatasetError):
    """Raised when trying to add a dataset that already exists."""

Version Management

class Version:
    """Named tuple for dataset versioning."""
    load: str  # Version to load (or None for latest)
    save: str  # Version to save (or None for timestamp)

def generate_timestamp() -> str:
    """Generate timestamp string for versioned datasets."""

Configuration Resolver and Dataset Patterns

The config_resolver parameter enables advanced dataset management through patterns:

Default Runtime Patterns

DataCatalog includes a default pattern for non-registered datasets:

default_runtime_patterns = {
    "{default}": {"type": "kedro.io.MemoryDataset"}
}

This means any dataset name not explicitly registered automatically becomes a MemoryDataset.

Custom Configuration Resolver

from kedro.io import DataCatalog
from kedro.io.catalog_config_resolver import CatalogConfigResolver

# Define custom patterns
patterns = {
    "{default}": {"type": "kedro.io.MemoryDataset"},
    "raw_*": {
        "type": "pandas.CSVDataset",
        "filepath": "data/01_raw/{name}.csv"
    },
    "processed_*": {
        "type": "pandas.ParquetDataset",
        "filepath": "data/02_processed/{name}.parquet"
    }
}

# Create resolver
resolver = CatalogConfigResolver(
    config={},  # Explicit dataset configs
    patterns=patterns
)

# Create catalog with resolver
catalog = DataCatalog(config_resolver=resolver)

# These datasets are created dynamically from patterns
# raw_customers → pandas.CSVDataset("data/01_raw/raw_customers.csv")
# processed_orders → pandas.ParquetDataset("data/02_processed/processed_orders.parquet")

Pattern Matching Rules

Patterns are matched in order of specificity:

  1. Exact name match in explicit config
  2. Most specific pattern match (longest prefix)
  3. Default pattern {default}
# Pattern priority example
patterns = {
    "model_*": {"type": "pickle.PickleDataset", "filepath": "models/{name}.pkl"},
    "model_final_*": {"type": "kedro.io.CachedDataset", "dataset": {...}},
    "{default}": {"type": "kedro.io.MemoryDataset"}
}

# Matching:
# "model_final_v2" → Uses "model_final_*" (most specific)
# "model_v1" → Uses "model_*"
# "random_data" → Uses "{default}"

Credentials with Config Resolver

from kedro.io import DataCatalog
from kedro.io.catalog_config_resolver import CatalogConfigResolver

# Catalog config with credentials reference
catalog_config = {
    "my_dataset": {
        "type": "pandas.CSVDataset",
        "filepath": "s3://bucket/data.csv",
        "credentials": "s3_credentials"  # Reference to credentials
    }
}

# Credentials
credentials = {
    "s3_credentials": {
        "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE",
        "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
    }
}

# Create resolver with credentials
resolver = CatalogConfigResolver(
    config=catalog_config,
    credentials=credentials
)

catalog = DataCatalog(config_resolver=resolver)

# Dataset receives resolved credentials automatically
data = catalog.load("my_dataset")

Usage Examples

Basic Catalog Operations

from kedro.io import DataCatalog, MemoryDataset

# Create catalog
catalog = DataCatalog({
    "input": MemoryDataset([1, 2, 3]),
    "output": MemoryDataset()
})

# Load data
data = catalog.load("input")
print(data)  # [1, 2, 3]

# Save data
catalog.save("output", [2, 4, 6])

# Check existence
if catalog.exists("input"):
    print("Dataset exists")  # True

if "output" in catalog:
    print("Output in catalog")  # True

# List datasets (filter with no arguments lists all)
all_datasets = catalog.filter()
print(all_datasets)  # ['input', 'output']

# Filter with regex
params_only = catalog.filter(name_regex=r"^params:")
model_datasets = catalog.filter(name_regex=r"model_.*")

# Dict-like access
dataset = catalog["input"]
data = dataset.load()

# Iterate over catalog
for name in catalog:
    print(f"Dataset: {name}")

# Get keys, values, items
names = catalog.keys()
datasets = catalog.values()
pairs = catalog.items()

Using add_feed_dict

from kedro.io import DataCatalog

# Create empty catalog
catalog = DataCatalog()

# Add multiple datasets using dict-like assignment with MemoryDatasets
feed_dict = {
    "input_data": [1, 2, 3, 4, 5],
    "config": {"learning_rate": 0.01},
    "params:model": {"max_depth": 5}
}

for name, data in feed_dict.items():
    catalog[name] = MemoryDataset(data)

# Load the datasets
data = catalog.load("input_data")
config = catalog.load("config")
params = catalog.load("params:model")

# Replace an existing dataset
catalog["input_data"] = MemoryDataset([10, 20, 30])

# Or check first if you don't want to replace
if "input_data" not in catalog:
    catalog["input_data"] = MemoryDataset([100, 200])

Release and Confirm

from kedro.io import DataCatalog, CachedDataset
from kedro_datasets.pandas import CSVDataset

# Create catalog with cached dataset
catalog = DataCatalog({
    "cached_data": CachedDataset(
        CSVDataset(filepath="data/large_file.csv")
    )
})

# Load data (caches in memory)
data = catalog.load("cached_data")

# Release cached data (frees memory)
catalog.release("cached_data")

# Confirm dataset exists (raises error if not found)
try:
    catalog.confirm("cached_data")  # OK
    catalog.confirm("non_existent")  # Raises DatasetNotFoundError
except DatasetNotFoundError as e:
    print(f"Error: {e}")

Creating from Configuration

from kedro.io import DataCatalog

# Define catalog configuration
catalog_config = {
    "raw_data": {
        "type": "pandas.CSVDataset",
        "filepath": "data/raw/input.csv",
        "load_args": {"sep": ","},
        "save_args": {"index": False}
    },
    "processed_data": {
        "type": "pandas.ParquetDataset",
        "filepath": "data/processed/output.parquet"
    },
    "model": {
        "type": "pickle.PickleDataset",
        "filepath": "data/models/model.pkl",
        "versioned": True  # Enable versioning
    }
}

# Create catalog
catalog = DataCatalog.from_config(catalog_config)

# Load data
raw = catalog.load("raw_data")

With Credentials

from kedro.io import DataCatalog

# Catalog config with credentials references
catalog_config = {
    "s3_data": {
        "type": "pandas.CSVDataset",
        "filepath": "s3://my-bucket/data.csv",
        "credentials": "s3_creds"
    },
    "postgres_table": {
        "type": "pandas.SQLTableDataset",
        "table_name": "users",
        "credentials": "postgres_creds"
    }
}

# Credentials (separate from catalog config)
credentials = {
    "s3_creds": {
        "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE",
        "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
    },
    "postgres_creds": {
        "con": "postgresql://user:pass@localhost:5432/db"
    }
}

# Create catalog with credentials
catalog = DataCatalog.from_config(
    catalog=catalog_config,
    credentials=credentials
)

# Credentials are automatically injected
data = catalog.load("s3_data")

With Versioning

from kedro.io import DataCatalog

catalog_config = {
    "model": {
        "type": "pickle.PickleDataset",
        "filepath": "data/models/model.pkl",
        "versioned": True
    },
    "results": {
        "type": "pandas.CSVDataset",
        "filepath": "data/results.csv",
        "versioned": True
    }
}

# Load specific versions
catalog = DataCatalog.from_config(
    catalog=catalog_config,
    load_versions={
        "model": "2024-01-15T10.30.45.123Z",
        "results": "2024-01-15T11.00.00.000Z"
    },
    save_version="2024-01-15T12.00.00.000Z"
)

# Loads specified versions
model = catalog.load("model")  # Loads 2024-01-15T10.30.45.123Z

# Saves with specified version
catalog.save("model", new_model)  # Saves as 2024-01-15T12.00.00.000Z

Working with Versioning

# Load specific version
catalog = DataCatalog(
    datasets={...},
    load_versions={"model": "2024-01-15T10.30.45.123Z"}
)

# Save with version
catalog = DataCatalog(
    datasets={...},
    save_version="2024-01-15T11.00.00.000Z"
)

SharedMemoryDataCatalog for Parallel Execution

from multiprocessing.managers import SyncManager
from kedro.io import SharedMemoryDataCatalog, MemoryDataset
from kedro.runner import ParallelRunner

# Create shared memory catalog
catalog = SharedMemoryDataCatalog({
    "input_data": MemoryDataset([1, 2, 3, 4, 5]),
    "output_data": MemoryDataset()
})

# Note: SharedMemoryDataCatalog is typically used internally by ParallelRunner
# ParallelRunner automatically converts DataCatalog to SharedMemoryDataCatalog
runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
# Access data using .load()
result = outputs["output_data"].load()

# For advanced use: Manual multiprocessing manager setup
manager = SyncManager()
manager.start()
catalog.set_manager_datasets(manager)

# Validate catalog is compatible with multiprocessing
try:
    catalog.validate_catalog()
    print("Catalog is valid for parallel execution")
except AttributeError as e:
    print(f"Validation failed: {e}")

See also:

  • AbstractDataset API - Create custom datasets
  • MemoryDataset API - In-memory storage
  • Working with Data Guide - Catalog patterns