tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Manage the lifecycle of a Kedro run with proper resource management and context creation.
from kedro.framework.session import KedroSessionclass KedroSession:
"""Manage the lifecycle of a Kedro run."""
@classmethod
def create(
cls,
project_path: str | Path | None = None,
save_on_close: bool = True,
env: str | None = None,
runtime_params: dict[str, Any] | None = None
) -> "KedroSession":
"""
Create a new KedroSession.
Parameters:
- project_path: Path to Kedro project root (auto-detected if None)
- save_on_close: Whether to save session data when closing
- env: Environment name (e.g., 'local', 'prod')
- runtime_params: Additional parameters to override configuration
Returns:
New KedroSession instance
"""
def load_context(self) -> KedroContext:
"""Load the project context."""
def run(
self,
pipeline_name: str | None = None,
tags: Iterable[str] | None = None,
runner: AbstractRunner | None = None,
node_names: Iterable[str] | None = None,
from_nodes: Iterable[str] | None = None,
to_nodes: Iterable[str] | None = None,
from_inputs: Iterable[str] | None = None,
to_outputs: Iterable[str] | None = None,
load_versions: dict[str, str] | None = None,
namespaces: Iterable[str] | None = None,
only_missing_outputs: bool = False
) -> dict[str, Any]:
"""
Run a pipeline within the session.
Parameters:
- pipeline_name: Name of registered pipeline (None for default)
- tags: Filter pipeline nodes by tags
- runner: Runner instance (defaults to SequentialRunner)
- node_names: Run only specified nodes
- from_nodes: Run from these nodes onwards (inclusive)
- to_nodes: Run up to these nodes (inclusive)
- from_inputs: Run from nodes consuming these inputs
- to_outputs: Run up to nodes producing these outputs
- load_versions: Specific versions for versioned datasets
- namespaces: Filter to specific namespaces
- only_missing_outputs: Only run nodes whose outputs don't exist in catalog.
Enables incremental execution by skipping nodes with existing outputs.
Algorithm:
1. Check each node's outputs via catalog.exists()
2. Skip node if ALL outputs exist and are not parameters
3. Run node if ANY output is missing
4. Parameters are always considered "missing" (always loaded)
Best for:
- Long-running pipelines where you want to resume after failures
- Reprocessing only changed data
- Development workflows with expensive computations
Example: If node produces ["output1", "output2"] and output1 exists
but output2 is missing, the node WILL run
Returns:
Dictionary mapping output dataset names to values
"""
def close(self) -> None:
"""Close the session and release resources."""
def __enter__(self) -> "KedroSession":
"""Enter context manager."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit context manager and close session."""class KedroSessionError(Exception):
"""
Raised when errors occur during Kedro session management.
Common scenarios:
- Session creation failures (invalid project path, missing configuration)
- Session run failures (pipeline not found, invalid parameters)
- Resource management errors (catalog initialization, context loading)
"""See Exceptions Reference for all Kedro exceptions.
from kedro.framework.session import KedroSession
# Using as context manager (recommended)
with KedroSession.create() as session:
session.run()with KedroSession.create(env="prod") as session:
session.run(pipeline_name="data_processing")with KedroSession.create(
env="prod",
runtime_params={"model.learning_rate": 0.01}
) as session:
session.run()with KedroSession.create() as session:
# Run with tags
session.run(tags=["preprocessing", "training"])
# Run specific nodes
session.run(node_names=["clean_data", "train_model"])
# Run range
session.run(from_nodes=["clean_data"], to_nodes=["evaluate_model"])from kedro.runner import ParallelRunner
from kedro.framework.session import KedroSession
with KedroSession.create() as session:
session.run(runner=ParallelRunner(max_workers=4))with KedroSession.create() as session:
context = session.load_context()
catalog = context.catalog
params = context.params
# Use catalog and params
data = catalog.load("input_data")See also: