tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Core APIs for creating and composing data pipelines as Directed Acyclic Graphs (DAGs).
from kedro.pipeline import node, pipeline, Node, Pipelinedef node(
func: Callable,
inputs: str | list[str] | dict[str, str] | None,
outputs: str | list[str] | dict[str, str] | None,
*,
name: str | None = None,
tags: str | Iterable[str] | None = None,
confirms: str | list[str] | None = None,
namespace: str | None = None
) -> Node:
"""
Create a pipeline node.
Parameters:
- func: Function to execute (any callable)
- inputs: Input dataset names
- str: Single input
- list[str]: Multiple inputs (positional args)
- dict[str, str]: Named inputs (keyword args)
- None: No inputs
- outputs: Output dataset names
- str: Single output
- list[str]: Multiple outputs (function returns tuple/list)
- dict[str, str]: Named outputs (function returns dict)
- None: No outputs
- name: Optional unique node name (auto-generated if None)
- tags: Tags for filtering (str or iterable)
- confirms: Dataset names to confirm exist before running
- namespace: Namespace for logical grouping
Returns:
Node instance
Note:
When dict is provided for inputs or outputs, the keys specify how values
map to function parameters or return values, but internally the node stores
only the dataset names (dict values) as a list. The mapping is used during
execution but not preserved in the Node.inputs or Node.outputs attributes.
"""def pipeline(
nodes: Iterable[Node | Pipeline],
*,
inputs: str | set[str] | dict[str, str] | None = None,
outputs: str | set[str] | dict[str, str] | None = None,
parameters: str | set[str] | dict[str, str] | None = None,
namespace: str | None = None,
tags: str | Iterable[str] | None = None,
prefix_datasets_with_namespace: bool = True
) -> Pipeline:
"""
Create a pipeline from nodes.
Parameters:
- nodes: Collection of nodes or pipelines to compose
- inputs: Explicit pipeline inputs (makes these external)
- str: Single input name
- set[str]: Multiple input names
- dict[str, str]: Rename inputs (old_name -> new_name)
- None: Auto-detect inputs
- outputs: Explicit pipeline outputs (makes these external)
- str: Single output name
- set[str]: Multiple output names
- dict[str, str]: Rename outputs (old_name -> new_name)
- None: Auto-detect outputs
- parameters: Parameters to namespace
- str: Single parameter name
- set[str]: Multiple parameter names
- dict[str, str]: Rename parameters (old_name -> new_name)
- None: Auto-detect parameters
- Parameters can be specified with or without 'params:' prefix
- namespace: Apply namespace to all nodes and their datasets
- tags: Apply tags to all nodes
- prefix_datasets_with_namespace: If True, prefix inputs, outputs, and parameters
of nodes with the namespace. Set to False when using namespacing for deployment
grouping rather than logical separation
Returns:
Pipeline instance
"""class Node:
"""Represents a node in a pipeline."""
@property
def func(self) -> Callable:
"""Get the node's function."""
@property
def inputs(self) -> list[str]:
"""
Get list of input dataset names.
Note: Always returns a list, even if node was created with dict inputs.
Dict inputs are converted to a list in function parameter binding order.
"""
@property
def outputs(self) -> list[str]:
"""Get list of output dataset names."""
@property
def name(self) -> str:
"""Get the node name (includes namespace if present)."""
@property
def short_name(self) -> str:
"""Get the node's short name without namespace."""
@property
def namespace_prefixes(self) -> list[str]:
"""
Get list of cumulative namespace prefixes from shortest to longest.
For namespace "a.b.c", returns ["a", "a.b", "a.b.c"].
For nodes without namespace, returns [].
"""
@property
def tags(self) -> set[str]:
"""Get the node's tags."""
@property
def confirms(self) -> list[str]:
"""Get list of datasets to confirm before running."""
@property
def namespace(self) -> str | None:
"""Get the node's namespace."""
def run(self, inputs: dict[str, Any] | None = None) -> dict[str, Any]:
"""
Run the node with provided inputs.
Parameters:
- inputs: Dictionary mapping input names to values
Returns:
Dictionary mapping output names to values
"""
def __call__(self, **kwargs: Any) -> dict[str, Any]:
"""Call the node with keyword arguments."""
def tag(self, tags: str | Iterable[str]) -> "Node":
"""
Add tags to a copy of this node.
Parameters:
- tags: Tags to add
Returns:
New Node instance with additional tags
"""
@func.setter
def func(self, func: Callable) -> None:
"""Set or replace the node's function."""
def __eq__(self, other: Any) -> bool:
"""Check equality with another node (compares all attributes)."""
def __lt__(self, other: Any) -> bool:
"""Compare nodes for sorting (by name)."""
def __hash__(self) -> int:
"""Return hash of the node (based on name)."""
def __str__(self) -> str:
"""Return readable string representation."""
def __repr__(self) -> str:
"""Return detailed string representation."""class Pipeline:
"""Collection of nodes that can be executed as a DAG."""
@property
def nodes(self) -> list[Node]:
"""Get all nodes in the pipeline."""
@property
def grouped_nodes(self) -> list[list[Node]]:
"""
Get nodes in topologically ordered groups.
Nodes in same group can execute in parallel.
"""
def all_inputs(self) -> set[str]:
"""Get all input datasets used by the pipeline."""
def all_outputs(self) -> set[str]:
"""
Get all output datasets produced by the pipeline.
Includes intermediate outputs consumed by other nodes.
"""
def inputs(self) -> set[str]:
"""Get external input datasets (not produced by any node)."""
def outputs(self) -> set[str]:
"""Get external output datasets (not consumed by any node)."""
def datasets(self) -> set[str]:
"""Get all dataset names used in the pipeline."""
def describe(self, names_only: bool = True) -> str:
"""
Get human-readable description of the pipeline.
Parameters:
- names_only: If True, only show node names; if False, show full details
Returns:
String description with node information and execution order
"""
def filter(
self,
*,
tags: Iterable[str] | None = None,
from_nodes: Iterable[str] | None = None,
to_nodes: Iterable[str] | None = None,
node_names: Iterable[str] | None = None,
from_inputs: Iterable[str] | None = None,
to_outputs: Iterable[str] | None = None,
node_namespaces: Iterable[str] | None = None
) -> "Pipeline":
"""
Filter pipeline to create a subpipeline.
Parameters:
- tags: Filter to nodes with any of these tags
- from_nodes: Include nodes downstream from these nodes
- to_nodes: Include nodes upstream to these nodes
- node_names: Include only these specific nodes
- from_inputs: Include nodes downstream from these inputs
- to_outputs: Include nodes upstream to these outputs
- node_namespaces: Filter to nodes in these namespaces
Returns:
New filtered Pipeline instance
"""
def only_nodes(self, *node_names: str) -> "Pipeline":
"""Create pipeline with only specified nodes."""
def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline":
"""Create pipeline from nodes that depend on given inputs."""
def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline":
"""Create pipeline with nodes leading to given outputs."""
def only_nodes_with_namespaces(self, node_namespaces: list[str]) -> "Pipeline":
"""Filter pipeline to nodes in specified namespaces."""
def only_nodes_with_tags(self, *tags: str) -> "Pipeline":
"""Filter pipeline to nodes with specified tags."""
def from_inputs(self, *inputs: str) -> "Pipeline":
"""
Create pipeline starting from nodes consuming given inputs.
Includes starting nodes and all downstream nodes.
"""
def to_outputs(self, *outputs: str) -> "Pipeline":
"""
Create pipeline ending at nodes producing given outputs.
Includes ending nodes and all upstream nodes.
"""
def from_nodes(self, *node_names: str) -> "Pipeline":
"""
Create pipeline starting from specified nodes (inclusive).
Includes named nodes and all downstream nodes.
"""
def to_nodes(self, *node_names: str) -> "Pipeline":
"""
Create pipeline ending at specified nodes (inclusive).
Includes named nodes and all upstream nodes.
"""
def group_nodes_by(self, group_by: str | None) -> list[GroupedNodes]:
"""
Group pipeline nodes by specified criteria.
Parameters:
- group_by: Grouping strategy
- None or "none": Group each node individually
- "namespace": Group nodes by their namespace
Returns:
List of GroupedNodes instances
"""
@property
def node_dependencies(self) -> dict[Node, set[Node]]:
"""
Get mapping of nodes to their dependencies.
Each node maps to the set of nodes it depends on.
"""
def tag(self, tags: str | Iterable[str]) -> "Pipeline":
"""
Add tags to all nodes in the pipeline.
Parameters:
- tags: Tags to add
Returns:
New Pipeline instance with tagged nodes
"""
def to_json(self) -> str:
"""Convert pipeline to JSON representation."""
def __add__(self, other: "Pipeline") -> "Pipeline":
"""Combine two pipelines."""
def __sub__(self, other: "Pipeline") -> "Pipeline":
"""Remove nodes from pipeline."""
def __or__(self, other: "Pipeline") -> "Pipeline":
"""Combine two pipelines using | operator."""
def __radd__(self, other: Any) -> "Pipeline":
"""Right-hand addition (allows sum([pipeline1, pipeline2]))."""
def __and__(self, other: Any) -> "Pipeline":
"""Intersection of two pipelines (shared nodes)."""
def __repr__() -> str:
"""String representation of the pipeline."""GroupedNodes is a dataclass that represents a logical group of nodes in a pipeline. It's used to support deployment patterns such as executing multiple nodes together in a single container or deployment unit.
from dataclasses import dataclass, field
from kedro.pipeline import GroupedNodes
@dataclass
class GroupedNodes:
"""
Represents a logical group of pipeline nodes.
This class is used to organize nodes into logical units for deployment
and execution purposes. Groups can be formed by namespace or custom
criteria using Pipeline.group_nodes_by().
Attributes:
- name: str - Name identifying the group (e.g., namespace name)
- type: str - Type of grouping: "namespace" or "nodes"
- nodes: list[str] - List of node names included in this group
- dependencies: list[str] - List of node names that are dependencies
(nodes outside the group that this group depends on)
Purpose:
GroupedNodes facilitates deployment patterns where multiple nodes need
to be executed together, such as:
- Running all nodes in a namespace as a single Kubernetes job
- Packaging related nodes into a single deployment unit
- Orchestrating groups of nodes in workflow engines
"""
name: str
type: str
nodes: list[str] = field(default_factory=list)
dependencies: list[str] = field(default_factory=list)Import:
from kedro.pipeline import GroupedNodesCreating Grouped Nodes:
GroupedNodes instances are typically created automatically by Pipeline.group_nodes_by():
from kedro.pipeline import pipeline, node
# Create pipeline with namespaces
my_pipeline = pipeline([
node(func1, inputs="data", outputs="result1", namespace="preprocessing"),
node(func2, inputs="result1", outputs="result2", namespace="preprocessing"),
node(func3, inputs="result2", outputs="final", namespace="modeling"),
])
# Group nodes by namespace
groups = my_pipeline.group_nodes_by("namespace")
# Returns list[GroupedNodes]:
# [
# GroupedNodes(name="preprocessing", type="namespace",
# nodes=["preprocessing.func1", "preprocessing.func2"],
# dependencies=[]),
# GroupedNodes(name="modeling", type="namespace",
# nodes=["modeling.func3"],
# dependencies=["preprocessing.func1", "preprocessing.func2"])
# ]
# Access group properties
for group in groups:
print(f"Group: {group.name}")
print(f"Type: {group.type}")
print(f"Nodes: {group.nodes}")
print(f"Dependencies: {group.dependencies}")Use Case - Deployment:
# Deploy each namespace as a separate Kubernetes job
def deploy_group_as_k8s_job(group: GroupedNodes):
"""Deploy a group of nodes as a single Kubernetes job."""
job_spec = {
"name": f"kedro-{group.name}",
"nodes": group.nodes,
"dependencies": group.dependencies
}
# Create K8s job that runs these nodes together
create_kubernetes_job(job_spec)
# Group and deploy
groups = my_pipeline.group_nodes_by("namespace")
for group in groups:
deploy_group_as_k8s_job(group)from typing import Callable, Iterable, Any
NodeFunc = Callable[..., Any]
InputSpec = str | list[str] | dict[str, str] | None
OutputSpec = str | list[str] | dict[str, str] | Nonefrom kedro.pipeline import node
# Simple node with single input/output
process_node = node(
func=process_data,
inputs="raw_data",
outputs="processed_data",
name="process"
)
# Node with multiple inputs (positional args)
combine_node = node(
func=lambda x, y: x + y,
inputs=["data_a", "data_b"],
outputs="combined"
)
# Node with keyword arguments
filter_node = node(
func=filter_data,
inputs={"data": "raw_data", "threshold": "params:threshold"},
outputs="filtered_data",
tags=["preprocessing"]
)
# Node with multiple outputs
split_node = node(
func=train_test_split,
inputs=["full_dataset", "params:split_ratio"],
outputs=["train_data", "test_data"]
)from kedro.pipeline import node, pipeline
# Simple pipeline
my_pipeline = pipeline([
node(clean_data, "raw", "clean"),
node(transform, "clean", "transformed"),
node(analyze, "transformed", "results")
])
# Pipeline with explicit inputs/outputs
data_pipeline = pipeline(
[clean_node, transform_node],
inputs="raw_data",
outputs="processed_data"
)
# Pipeline with namespace
preprocessing = pipeline(
[clean_node, transform_node],
namespace="preprocessing"
)
# Pipeline with tags
ml_pipeline = pipeline(
[train_node, evaluate_node],
tags=["ml", "training"]
)
# Compose pipelines
full_pipeline = preprocessing + ml_pipeline# Filter by tags
preprocessing_only = pipeline.only_nodes_with_tags("preprocessing")
# Run from specific node onwards
from_clean = pipeline.from_nodes("clean_data")
# Run up to specific node
to_transform = pipeline.to_nodes("transform_data")
# Run specific range
clean_to_analyze = pipeline.from_nodes("clean").to_nodes("analyze")
# Filter by namespace
namespace_pipe = pipeline.only_nodes_with_namespaces(["data_prep"])
# Complex filtering
filtered = pipeline.filter(
tags=["ml"],
from_inputs=["features"],
to_outputs=["metrics"]
)# Create pipelines with namespaces
ingestion = pipeline([...], namespace="ingestion")
preprocessing = pipeline([...], namespace="preprocessing")
modeling = pipeline([...], namespace="modeling")
# Combine namespaced pipelines
full_pipeline = ingestion + preprocessing + modeling
# Filter by namespace
preprocessing_only = full_pipeline.only_nodes_with_namespaces(["preprocessing"])# Get pipeline information
print(f"Total nodes: {len(pipeline.nodes)}")
print(f"External inputs: {pipeline.inputs()}")
print(f"External outputs: {pipeline.outputs()}")
print(f"All datasets: {pipeline.datasets()}")
# Describe pipeline
print(pipeline.describe())
# Get node dependencies
deps = pipeline.node_dependencies
for node, dependencies in deps.items():
print(f"{node.name} depends on: {[n.name for n in dependencies]}")
# Get grouped nodes for parallel execution
for group in pipeline.grouped_nodes:
print(f"Parallel group: {[n.name for n in group]}")# Add pipelines together
combined = pipeline_a + pipeline_b
# Remove nodes
without_preprocessing = full_pipeline - preprocessing_pipeline
# Add tags to existing pipeline
tagged = pipeline.tag(["production", "v2"])See also: