or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

pipeline.mddocs/api/

Pipeline and Node API Reference

Core APIs for creating and composing data pipelines as Directed Acyclic Graphs (DAGs).

Module Import

from kedro.pipeline import node, pipeline, Node, Pipeline

Node Creation

def node(
    func: Callable,
    inputs: str | list[str] | dict[str, str] | None,
    outputs: str | list[str] | dict[str, str] | None,
    *,
    name: str | None = None,
    tags: str | Iterable[str] | None = None,
    confirms: str | list[str] | None = None,
    namespace: str | None = None
) -> Node:
    """
    Create a pipeline node.

    Parameters:
    - func: Function to execute (any callable)
    - inputs: Input dataset names
        - str: Single input
        - list[str]: Multiple inputs (positional args)
        - dict[str, str]: Named inputs (keyword args)
        - None: No inputs
    - outputs: Output dataset names
        - str: Single output
        - list[str]: Multiple outputs (function returns tuple/list)
        - dict[str, str]: Named outputs (function returns dict)
        - None: No outputs
    - name: Optional unique node name (auto-generated if None)
    - tags: Tags for filtering (str or iterable)
    - confirms: Dataset names to confirm exist before running
    - namespace: Namespace for logical grouping

    Returns:
    Node instance

    Note:
    When dict is provided for inputs or outputs, the keys specify how values
    map to function parameters or return values, but internally the node stores
    only the dataset names (dict values) as a list. The mapping is used during
    execution but not preserved in the Node.inputs or Node.outputs attributes.
    """

Pipeline Creation

def pipeline(
    nodes: Iterable[Node | Pipeline],
    *,
    inputs: str | set[str] | dict[str, str] | None = None,
    outputs: str | set[str] | dict[str, str] | None = None,
    parameters: str | set[str] | dict[str, str] | None = None,
    namespace: str | None = None,
    tags: str | Iterable[str] | None = None,
    prefix_datasets_with_namespace: bool = True
) -> Pipeline:
    """
    Create a pipeline from nodes.

    Parameters:
    - nodes: Collection of nodes or pipelines to compose
    - inputs: Explicit pipeline inputs (makes these external)
        - str: Single input name
        - set[str]: Multiple input names
        - dict[str, str]: Rename inputs (old_name -> new_name)
        - None: Auto-detect inputs
    - outputs: Explicit pipeline outputs (makes these external)
        - str: Single output name
        - set[str]: Multiple output names
        - dict[str, str]: Rename outputs (old_name -> new_name)
        - None: Auto-detect outputs
    - parameters: Parameters to namespace
        - str: Single parameter name
        - set[str]: Multiple parameter names
        - dict[str, str]: Rename parameters (old_name -> new_name)
        - None: Auto-detect parameters
        - Parameters can be specified with or without 'params:' prefix
    - namespace: Apply namespace to all nodes and their datasets
    - tags: Apply tags to all nodes
    - prefix_datasets_with_namespace: If True, prefix inputs, outputs, and parameters
        of nodes with the namespace. Set to False when using namespacing for deployment
        grouping rather than logical separation

    Returns:
    Pipeline instance
    """

Node Class

class Node:
    """Represents a node in a pipeline."""

    @property
    def func(self) -> Callable:
        """Get the node's function."""

    @property
    def inputs(self) -> list[str]:
        """
        Get list of input dataset names.

        Note: Always returns a list, even if node was created with dict inputs.
        Dict inputs are converted to a list in function parameter binding order.
        """

    @property
    def outputs(self) -> list[str]:
        """Get list of output dataset names."""

    @property
    def name(self) -> str:
        """Get the node name (includes namespace if present)."""

    @property
    def short_name(self) -> str:
        """Get the node's short name without namespace."""

    @property
    def namespace_prefixes(self) -> list[str]:
        """
        Get list of cumulative namespace prefixes from shortest to longest.
        For namespace "a.b.c", returns ["a", "a.b", "a.b.c"].
        For nodes without namespace, returns [].
        """

    @property
    def tags(self) -> set[str]:
        """Get the node's tags."""

    @property
    def confirms(self) -> list[str]:
        """Get list of datasets to confirm before running."""

    @property
    def namespace(self) -> str | None:
        """Get the node's namespace."""

    def run(self, inputs: dict[str, Any] | None = None) -> dict[str, Any]:
        """
        Run the node with provided inputs.

        Parameters:
        - inputs: Dictionary mapping input names to values

        Returns:
        Dictionary mapping output names to values
        """

    def __call__(self, **kwargs: Any) -> dict[str, Any]:
        """Call the node with keyword arguments."""

    def tag(self, tags: str | Iterable[str]) -> "Node":
        """
        Add tags to a copy of this node.

        Parameters:
        - tags: Tags to add

        Returns:
        New Node instance with additional tags
        """

    @func.setter
    def func(self, func: Callable) -> None:
        """Set or replace the node's function."""

    def __eq__(self, other: Any) -> bool:
        """Check equality with another node (compares all attributes)."""

    def __lt__(self, other: Any) -> bool:
        """Compare nodes for sorting (by name)."""

    def __hash__(self) -> int:
        """Return hash of the node (based on name)."""

    def __str__(self) -> str:
        """Return readable string representation."""

    def __repr__(self) -> str:
        """Return detailed string representation."""

Pipeline Class

class Pipeline:
    """Collection of nodes that can be executed as a DAG."""

    @property
    def nodes(self) -> list[Node]:
        """Get all nodes in the pipeline."""

    @property
    def grouped_nodes(self) -> list[list[Node]]:
        """
        Get nodes in topologically ordered groups.
        Nodes in same group can execute in parallel.
        """

    def all_inputs(self) -> set[str]:
        """Get all input datasets used by the pipeline."""

    def all_outputs(self) -> set[str]:
        """
        Get all output datasets produced by the pipeline.
        Includes intermediate outputs consumed by other nodes.
        """

    def inputs(self) -> set[str]:
        """Get external input datasets (not produced by any node)."""

    def outputs(self) -> set[str]:
        """Get external output datasets (not consumed by any node)."""

    def datasets(self) -> set[str]:
        """Get all dataset names used in the pipeline."""

    def describe(self, names_only: bool = True) -> str:
        """
        Get human-readable description of the pipeline.

        Parameters:
        - names_only: If True, only show node names; if False, show full details

        Returns:
        String description with node information and execution order
        """

    def filter(
        self,
        *,
        tags: Iterable[str] | None = None,
        from_nodes: Iterable[str] | None = None,
        to_nodes: Iterable[str] | None = None,
        node_names: Iterable[str] | None = None,
        from_inputs: Iterable[str] | None = None,
        to_outputs: Iterable[str] | None = None,
        node_namespaces: Iterable[str] | None = None
    ) -> "Pipeline":
        """
        Filter pipeline to create a subpipeline.

        Parameters:
        - tags: Filter to nodes with any of these tags
        - from_nodes: Include nodes downstream from these nodes
        - to_nodes: Include nodes upstream to these nodes
        - node_names: Include only these specific nodes
        - from_inputs: Include nodes downstream from these inputs
        - to_outputs: Include nodes upstream to these outputs
        - node_namespaces: Filter to nodes in these namespaces

        Returns:
        New filtered Pipeline instance
        """

    def only_nodes(self, *node_names: str) -> "Pipeline":
        """Create pipeline with only specified nodes."""

    def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline":
        """Create pipeline from nodes that depend on given inputs."""

    def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline":
        """Create pipeline with nodes leading to given outputs."""

    def only_nodes_with_namespaces(self, node_namespaces: list[str]) -> "Pipeline":
        """Filter pipeline to nodes in specified namespaces."""

    def only_nodes_with_tags(self, *tags: str) -> "Pipeline":
        """Filter pipeline to nodes with specified tags."""

    def from_inputs(self, *inputs: str) -> "Pipeline":
        """
        Create pipeline starting from nodes consuming given inputs.
        Includes starting nodes and all downstream nodes.
        """

    def to_outputs(self, *outputs: str) -> "Pipeline":
        """
        Create pipeline ending at nodes producing given outputs.
        Includes ending nodes and all upstream nodes.
        """

    def from_nodes(self, *node_names: str) -> "Pipeline":
        """
        Create pipeline starting from specified nodes (inclusive).
        Includes named nodes and all downstream nodes.
        """

    def to_nodes(self, *node_names: str) -> "Pipeline":
        """
        Create pipeline ending at specified nodes (inclusive).
        Includes named nodes and all upstream nodes.
        """

    def group_nodes_by(self, group_by: str | None) -> list[GroupedNodes]:
        """
        Group pipeline nodes by specified criteria.

        Parameters:
        - group_by: Grouping strategy
            - None or "none": Group each node individually
            - "namespace": Group nodes by their namespace

        Returns:
        List of GroupedNodes instances
        """

    @property
    def node_dependencies(self) -> dict[Node, set[Node]]:
        """
        Get mapping of nodes to their dependencies.
        Each node maps to the set of nodes it depends on.
        """

    def tag(self, tags: str | Iterable[str]) -> "Pipeline":
        """
        Add tags to all nodes in the pipeline.

        Parameters:
        - tags: Tags to add

        Returns:
        New Pipeline instance with tagged nodes
        """

    def to_json(self) -> str:
        """Convert pipeline to JSON representation."""

    def __add__(self, other: "Pipeline") -> "Pipeline":
        """Combine two pipelines."""

    def __sub__(self, other: "Pipeline") -> "Pipeline":
        """Remove nodes from pipeline."""

    def __or__(self, other: "Pipeline") -> "Pipeline":
        """Combine two pipelines using | operator."""

    def __radd__(self, other: Any) -> "Pipeline":
        """Right-hand addition (allows sum([pipeline1, pipeline2]))."""

    def __and__(self, other: Any) -> "Pipeline":
        """Intersection of two pipelines (shared nodes)."""

    def __repr__() -> str:
        """String representation of the pipeline."""

GroupedNodes

GroupedNodes is a dataclass that represents a logical group of nodes in a pipeline. It's used to support deployment patterns such as executing multiple nodes together in a single container or deployment unit.

from dataclasses import dataclass, field
from kedro.pipeline import GroupedNodes

@dataclass
class GroupedNodes:
    """
    Represents a logical group of pipeline nodes.

    This class is used to organize nodes into logical units for deployment
    and execution purposes. Groups can be formed by namespace or custom
    criteria using Pipeline.group_nodes_by().

    Attributes:
    - name: str - Name identifying the group (e.g., namespace name)
    - type: str - Type of grouping: "namespace" or "nodes"
    - nodes: list[str] - List of node names included in this group
    - dependencies: list[str] - List of node names that are dependencies
                                 (nodes outside the group that this group depends on)

    Purpose:
    GroupedNodes facilitates deployment patterns where multiple nodes need
    to be executed together, such as:
    - Running all nodes in a namespace as a single Kubernetes job
    - Packaging related nodes into a single deployment unit
    - Orchestrating groups of nodes in workflow engines
    """
    name: str
    type: str
    nodes: list[str] = field(default_factory=list)
    dependencies: list[str] = field(default_factory=list)

Import:

from kedro.pipeline import GroupedNodes

Creating Grouped Nodes:

GroupedNodes instances are typically created automatically by Pipeline.group_nodes_by():

from kedro.pipeline import pipeline, node

# Create pipeline with namespaces
my_pipeline = pipeline([
    node(func1, inputs="data", outputs="result1", namespace="preprocessing"),
    node(func2, inputs="result1", outputs="result2", namespace="preprocessing"),
    node(func3, inputs="result2", outputs="final", namespace="modeling"),
])

# Group nodes by namespace
groups = my_pipeline.group_nodes_by("namespace")
# Returns list[GroupedNodes]:
# [
#   GroupedNodes(name="preprocessing", type="namespace",
#                nodes=["preprocessing.func1", "preprocessing.func2"],
#                dependencies=[]),
#   GroupedNodes(name="modeling", type="namespace",
#                nodes=["modeling.func3"],
#                dependencies=["preprocessing.func1", "preprocessing.func2"])
# ]

# Access group properties
for group in groups:
    print(f"Group: {group.name}")
    print(f"Type: {group.type}")
    print(f"Nodes: {group.nodes}")
    print(f"Dependencies: {group.dependencies}")

Use Case - Deployment:

# Deploy each namespace as a separate Kubernetes job
def deploy_group_as_k8s_job(group: GroupedNodes):
    """Deploy a group of nodes as a single Kubernetes job."""
    job_spec = {
        "name": f"kedro-{group.name}",
        "nodes": group.nodes,
        "dependencies": group.dependencies
    }
    # Create K8s job that runs these nodes together
    create_kubernetes_job(job_spec)

# Group and deploy
groups = my_pipeline.group_nodes_by("namespace")
for group in groups:
    deploy_group_as_k8s_job(group)

Types

from typing import Callable, Iterable, Any

NodeFunc = Callable[..., Any]
InputSpec = str | list[str] | dict[str, str] | None
OutputSpec = str | list[str] | dict[str, str] | None

Usage Examples

Creating Nodes

from kedro.pipeline import node

# Simple node with single input/output
process_node = node(
    func=process_data,
    inputs="raw_data",
    outputs="processed_data",
    name="process"
)

# Node with multiple inputs (positional args)
combine_node = node(
    func=lambda x, y: x + y,
    inputs=["data_a", "data_b"],
    outputs="combined"
)

# Node with keyword arguments
filter_node = node(
    func=filter_data,
    inputs={"data": "raw_data", "threshold": "params:threshold"},
    outputs="filtered_data",
    tags=["preprocessing"]
)

# Node with multiple outputs
split_node = node(
    func=train_test_split,
    inputs=["full_dataset", "params:split_ratio"],
    outputs=["train_data", "test_data"]
)

Creating Pipelines

from kedro.pipeline import node, pipeline

# Simple pipeline
my_pipeline = pipeline([
    node(clean_data, "raw", "clean"),
    node(transform, "clean", "transformed"),
    node(analyze, "transformed", "results")
])

# Pipeline with explicit inputs/outputs
data_pipeline = pipeline(
    [clean_node, transform_node],
    inputs="raw_data",
    outputs="processed_data"
)

# Pipeline with namespace
preprocessing = pipeline(
    [clean_node, transform_node],
    namespace="preprocessing"
)

# Pipeline with tags
ml_pipeline = pipeline(
    [train_node, evaluate_node],
    tags=["ml", "training"]
)

# Compose pipelines
full_pipeline = preprocessing + ml_pipeline

Filtering Pipelines

# Filter by tags
preprocessing_only = pipeline.only_nodes_with_tags("preprocessing")

# Run from specific node onwards
from_clean = pipeline.from_nodes("clean_data")

# Run up to specific node
to_transform = pipeline.to_nodes("transform_data")

# Run specific range
clean_to_analyze = pipeline.from_nodes("clean").to_nodes("analyze")

# Filter by namespace
namespace_pipe = pipeline.only_nodes_with_namespaces(["data_prep"])

# Complex filtering
filtered = pipeline.filter(
    tags=["ml"],
    from_inputs=["features"],
    to_outputs=["metrics"]
)

Working with Namespaces

# Create pipelines with namespaces
ingestion = pipeline([...], namespace="ingestion")
preprocessing = pipeline([...], namespace="preprocessing")
modeling = pipeline([...], namespace="modeling")

# Combine namespaced pipelines
full_pipeline = ingestion + preprocessing + modeling

# Filter by namespace
preprocessing_only = full_pipeline.only_nodes_with_namespaces(["preprocessing"])

Pipeline Inspection

# Get pipeline information
print(f"Total nodes: {len(pipeline.nodes)}")
print(f"External inputs: {pipeline.inputs()}")
print(f"External outputs: {pipeline.outputs()}")
print(f"All datasets: {pipeline.datasets()}")

# Describe pipeline
print(pipeline.describe())

# Get node dependencies
deps = pipeline.node_dependencies
for node, dependencies in deps.items():
    print(f"{node.name} depends on: {[n.name for n in dependencies]}")

# Get grouped nodes for parallel execution
for group in pipeline.grouped_nodes:
    print(f"Parallel group: {[n.name for n in group]}")

Combining and Manipulating Pipelines

# Add pipelines together
combined = pipeline_a + pipeline_b

# Remove nodes
without_preprocessing = full_pipeline - preprocessing_pipeline

# Add tags to existing pipeline
tagged = pipeline.tag(["production", "v2"])

See also:

  • Creating Pipelines Guide - Step-by-step pipeline creation
  • Runners API - Execute pipelines
  • DataCatalog API - Manage pipeline data