tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Advanced catalog protocols, configuration resolution, and utilities.
Protocol definitions for catalog implementations.
class CatalogProtocol:
"""
Protocol defining the interface for catalog implementations.
Ensures consistent catalog behavior across different implementations.
"""
def load(self, name: str) -> Any:
"""Load data from dataset."""
def save(self, name: str, data: Any) -> None:
"""Save data to dataset."""
def exists(self, name: str) -> bool:
"""Check if dataset exists."""
def release(self, name: str) -> None:
"""Release dataset resources."""
class SharedMemoryCatalogProtocol(CatalogProtocol):
"""
Protocol for catalog implementations supporting shared memory.
Extends CatalogProtocol with shared memory support for multiprocessing.
"""
def set_manager_datasets(self, manager: SyncManager) -> None:
"""Set multiprocessing manager for shared datasets."""
def validate_catalog(self) -> None:
"""Validate catalog for multiprocessing compatibility."""class CatalogConfigResolver:
"""
Resolves dataset configurations and patterns based on catalog config.
Manages dataset factory patterns, credentials, and dynamic configuration.
"""
def __init__(
self,
config: dict[str, dict[str, Any]] | None = None,
credentials: dict[str, dict[str, Any]] | None = None,
default_runtime_patterns: dict[str, dict[str, Any]] | None = None
):
"""
Initialize resolver.
Parameters:
- config: Catalog configuration dictionary
- credentials: Credentials configuration dictionary
- default_runtime_patterns: Runtime patterns for dynamic resolution
"""
@property
def config(self) -> dict[str, dict[str, Any]]:
"""
Get resolved dataset configurations.
Returns:
Resolved configurations with credentials applied and patterns expanded
Note:
Credentials are merged into dataset configs that reference them
"""
@staticmethod
def is_pattern(pattern: str) -> bool:
"""
Check if string is a dataset pattern (contains '{').
Parameters:
- pattern: String to check
Returns:
True if pattern contains placeholders
Example:
>>> CatalogConfigResolver.is_pattern("{namespace}.data")
True
>>> CatalogConfigResolver.is_pattern("regular_dataset")
False
"""
def resolve_pattern(self, dataset_name: str) -> dict[str, Any] | None:
"""
Resolve dataset name to configuration using patterns.
Matches dataset name against registered patterns and resolves
placeholders to create configuration.
Parameters:
- dataset_name: Dataset name to resolve (e.g., "users.raw_data")
Returns:
Resolved configuration dict, or None if no pattern matches
Example:
>>> # Pattern: "{namespace}.raw_data" -> CSVDataset("data/{namespace}.csv")
>>> config = resolver.resolve_pattern("users.raw_data")
>>> # Returns config for CSVDataset with filepath="data/users.csv"
"""
def match_dataset_pattern(self, dataset_name: str) -> str | None:
"""
Find matching dataset pattern for a name.
Parameters:
- dataset_name: Dataset name
Returns:
Matching pattern string, or None if no match
Example:
>>> resolver.match_dataset_pattern("users.raw_data")
"{namespace}.raw_data"
"""
def match_user_catch_all_pattern(self, dataset_name: str) -> dict[str, Any] | None:
"""
Match dataset name against user-defined catch-all pattern.
Catch-all patterns are patterns without placeholders that match any
dataset name not explicitly defined.
Parameters:
- dataset_name: Dataset name to match
Returns:
Resolved configuration if catch-all pattern matches, None otherwise
Example:
>>> # Catch-all: "*" -> MemoryDataset()
>>> config = resolver.match_user_catch_all_pattern("any_dataset")
>>> # Returns MemoryDataset config for undefined datasets
"""
def list_patterns(self) -> list[str]:
"""
List all registered dataset patterns.
Returns:
List of pattern strings
Example:
>>> resolver.list_patterns()
['{namespace}.raw_data', '{namespace}.processed_data', '*']
"""Utility function for identifying parameter datasets.
def is_parameter(dataset_name: str) -> bool:
"""
Check if a dataset name represents a parameter.
Parameters are special datasets that provide configuration values to nodes.
This function identifies parameter datasets based on naming conventions.
Parameters:
- dataset_name: Name of the dataset to check
Returns:
True if the dataset name represents a parameter, False otherwise
Notes:
- Returns True if dataset_name starts with "params:"
- Returns True if dataset_name equals "parameters"
- Returns False for all other dataset names
Examples:
>>> is_parameter("params:model.learning_rate")
True
>>> is_parameter("parameters")
True
>>> is_parameter("params:train")
True
>>> is_parameter("raw_data")
False
>>> is_parameter("model_output")
False
"""Usage in Node Definitions:
from kedro.pipeline import node
from kedro.io import is_parameter
def train_model(data, learning_rate, epochs):
# training logic
pass
# Node with parameter inputs
node(
train_model,
inputs=["training_data", "params:model.learning_rate", "params:model.epochs"],
outputs="trained_model"
)
# Check if dataset is a parameter
print(is_parameter("params:model.learning_rate")) # True
print(is_parameter("training_data")) # FalseParameter Resolution in Catalog:
from kedro.io import DataCatalog, is_parameter
catalog = DataCatalog()
# Parameters are typically loaded from configuration
# and are not saved during pipeline execution
for dataset_name in catalog.keys():
if is_parameter(dataset_name):
print(f"{dataset_name} is a parameter")from kedro.io import (
CatalogProtocol,
SharedMemoryCatalogProtocol,
CatalogConfigResolver,
is_parameter,
)# catalog.yml
"{namespace}.raw_data":
type: pandas.CSVDataset
filepath: data/01_raw/{namespace}.csv
"{namespace}.processed_data":
type: pandas.ParquetDataset
filepath: data/02_processed/{namespace}.parquetfrom kedro.io import CatalogConfigResolver
# Create resolver
resolver = CatalogConfigResolver(config=catalog_config)
# Check if pattern
print(resolver.is_pattern("{namespace}.raw_data")) # True
print(resolver.is_pattern("specific_dataset")) # False
# Resolve pattern
config = resolver.resolve_pattern("users.raw_data")
# Returns: {'type': 'pandas.CSVDataset', 'filepath': 'data/01_raw/users.csv'}
# List all patterns
patterns = resolver.list_patterns()
print(f"Registered patterns: {patterns}")# catalog.yml with catch-all
specific_dataset:
type: pandas.CSVDataset
filepath: specific.csv
"*": # Catch-all for undefined datasets
type: kedro.io.MemoryDataset# Defined dataset uses explicit config
config = resolver.resolve_pattern("specific_dataset")
# Returns CSVDataset config
# Undefined dataset uses catch-all
config = resolver.match_user_catch_all_pattern("undefined_dataset")
# Returns MemoryDataset config# catalog.yml
database:
type: pandas.SQLTableDataset
credentials: db_creds
table_name: users
# credentials.yml
db_creds:
con: postgresql://localhost/mydb# Resolver merges credentials
resolver = CatalogConfigResolver(
config=catalog_config,
credentials=credentials_config
)
# Credentials automatically merged
config = resolver.config["database"]
# Contains: type, table_name, AND con from credentials# Resolver with runtime patterns
resolver = CatalogConfigResolver(
config=base_config,
default_runtime_patterns={
"{temp}.*": {
'type': 'kedro.io.MemoryDataset'
}
}
)
# Runtime patterns available for temporary datasets
config = resolver.resolve_pattern("temp.intermediate_result")
# Returns MemoryDataset configfrom typing import Protocol
from kedro.io import CatalogProtocol
class CustomCatalog(CatalogProtocol):
"""Custom catalog implementation."""
def load(self, name: str):
# Implement load
pass
def save(self, name: str, data):
# Implement save
pass
def exists(self, name: str) -> bool:
# Implement exists
return False
def release(self, name: str):
# Implement release
passSee also: