or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

creating-pipelines.mddocs/guides/

Creating Pipelines Guide

Step-by-step guide for building, composing, and filtering Kedro pipelines.

Basic Pipeline Creation

Step 1: Define Processing Functions

def clean_data(raw_data):
    """Clean and preprocess raw data."""
    return [x.strip().lower() for x in raw_data]

def create_features(cleaned_data):
    """Create features from cleaned data."""
    return [{"text": x, "length": len(x)} for x in cleaned_data]

def train_model(features, parameters):
    """Train a model on features."""
    model = {"type": "simple", "data_count": len(features)}
    return model

Step 2: Create Nodes

from kedro.pipeline import node

# Create nodes wrapping the functions
clean_node = node(
    func=clean_data,
    inputs="raw_data",
    outputs="cleaned_data",
    name="clean"
)

feature_node = node(
    func=create_features,
    inputs="cleaned_data",
    outputs="features",
    name="create_features"
)

train_node = node(
    func=train_model,
    inputs=["features", "params:model_config"],
    outputs="trained_model",
    name="train"
)

Step 3: Compose into Pipeline

from kedro.pipeline import pipeline

ml_pipeline = pipeline([
    clean_node,
    feature_node,
    train_node
])

Node Input/Output Patterns

Single Input and Output

node(func=process, inputs="input_data", outputs="output_data")

Multiple Inputs (Positional Arguments)

def combine(data_a, data_b):
    return data_a + data_b

node(func=combine, inputs=["data_a", "data_b"], outputs="combined")

Named Inputs (Keyword Arguments)

def filter_data(data, threshold=0.5):
    return [x for x in data if x > threshold]

node(
    func=filter_data,
    inputs={"data": "raw_data", "threshold": "params:threshold"},
    outputs="filtered_data"
)

Multiple Outputs

def split_data(data, ratio=0.8):
    split_idx = int(len(data) * ratio)
    return data[:split_idx], data[split_idx:]

node(
    func=split_data,
    inputs=["full_dataset", "params:split_ratio"],
    outputs=["train_data", "test_data"]
)

No Inputs or Outputs

# Side-effect node (e.g., logging, notifications)
def log_completion():
    print("Pipeline completed!")

node(func=log_completion, inputs=None, outputs=None, name="log")

Pipeline Composition

Combining Pipelines

# Define separate pipelines
preprocessing = pipeline([clean_node, feature_node])
modeling = pipeline([train_node, evaluate_node])

# Combine pipelines
full_pipeline = preprocessing + modeling

Pipeline with Explicit Inputs/Outputs

# Make specific datasets external
data_pipeline = pipeline(
    [clean_node, feature_node, train_node],
    inputs="raw_data",
    outputs="trained_model"
)

Namespaced Pipelines

# Add namespace to all nodes
preprocessing = pipeline(
    [clean_node, feature_node],
    namespace="preprocessing"
)

modeling = pipeline(
    [train_node],
    namespace="modeling"
)

# Combine namespaced pipelines
full_pipeline = preprocessing + modeling

Tagged Pipelines

# Add tags to all nodes
ml_pipeline = pipeline(
    [train_node, evaluate_node],
    tags=["ml", "training"]
)

Pipeline Filtering

Filter by Tags

# Run only preprocessing nodes
preprocessing_only = full_pipeline.only_nodes_with_tags("preprocessing")

Filter by Node Names

# Run only specific nodes
specific_nodes = full_pipeline.only_nodes("clean", "train")

Filter by Inputs/Outputs

# Run from specific input
from_clean = full_pipeline.from_inputs("cleaned_data")

# Run to specific output
to_model = full_pipeline.to_outputs("trained_model")

Filter by Node Range

# Run from node onwards (inclusive)
from_clean_node = full_pipeline.from_nodes("clean")

# Run up to node (inclusive)
to_train_node = full_pipeline.to_nodes("train")

# Run specific range
clean_to_train = full_pipeline.from_nodes("clean").to_nodes("train")

Filter by Namespace

# Run only nodes in specific namespace
preprocessing_only = full_pipeline.only_nodes_with_namespaces(["preprocessing"])

Complex Filtering

# Combine multiple filters
filtered = full_pipeline.filter(
    tags=["ml"],
    from_inputs=["features"],
    to_outputs=["metrics"]
)

Modular Pipeline Pattern

Create reusable pipeline modules:

# Define a reusable data preprocessing module
def create_preprocessing_pipeline():
    return pipeline([
        node(load_raw_data, None, "raw_data"),
        node(clean_data, "raw_data", "cleaned_data"),
        node(validate_data, "cleaned_data", "validated_data")
    ])

# Define a model training module
def create_training_pipeline():
    return pipeline([
        node(create_features, "validated_data", "features"),
        node(train_model, ["features", "params:model"], "model"),
        node(evaluate_model, ["model", "features"], "metrics")
    ])

# Compose modules
def create_full_pipeline():
    preprocessing = create_preprocessing_pipeline()
    training = create_training_pipeline()
    return preprocessing + training

Multi-Stage Pipeline Pattern

Organize complex workflows into stages:

# Define stages
ingestion = pipeline([...], namespace="ingestion", tags=["stage_1"])
preprocessing = pipeline([...], namespace="preprocessing", tags=["stage_2"])
feature_eng = pipeline([...], namespace="feature_engineering", tags=["stage_3"])
modeling = pipeline([...], namespace="modeling", tags=["stage_4"])

# Combine all stages
full_pipeline = ingestion + preprocessing + feature_eng + modeling

# Run specific stages
session.run(tags=["stage_1", "stage_2"])  # Run only ingestion and preprocessing

Pipeline Inspection

Get Pipeline Information

# Number of nodes
print(f"Total nodes: {len(pipeline.nodes)}")

# External inputs (not produced by any node)
print(f"External inputs: {pipeline.inputs()}")

# External outputs (not consumed by any node)
print(f"External outputs: {pipeline.outputs()}")

# All datasets
print(f"All datasets: {pipeline.datasets()}")

Describe Pipeline

# Human-readable description
print(pipeline.describe())

# Node dependencies
deps = pipeline.node_dependencies
for node, dependencies in deps.items():
    print(f"{node.name} depends on: {[n.name for n in dependencies]}")

Grouped Nodes (for parallelization)

# Get nodes that can run in parallel
for group in pipeline.grouped_nodes:
    print(f"Parallel group: {[n.name for n in group]}")

Best Practices

1. Keep Functions Pure

# Good: Pure function
def clean_data(raw_data):
    return [x.strip().lower() for x in raw_data]

# Avoid: Side effects or state modification
def clean_data(raw_data):
    global cleaned_count  # Avoid global state
    cleaned_count += 1
    raw_data.clear()      # Avoid modifying inputs
    return processed

2. Use Descriptive Node Names

# Good: Clear names
node(clean_data, "raw", "clean", name="clean_customer_data")

# Less clear
node(clean_data, "raw", "clean", name="process")

3. Use Tags for Organization

# Tag nodes by stage, environment, or purpose
node(clean_data, "raw", "clean", tags=["preprocessing", "data_quality"])
node(train_model, "features", "model", tags=["ml", "training", "production"])

4. Use Namespaces for Modularity

# Create isolated, reusable modules
data_ingestion = pipeline([...], namespace="ingestion")
data_preprocessing = pipeline([...], namespace="preprocessing")

5. Keep Pipelines Focused

# Good: Separate concerns
preprocessing_pipeline = pipeline([clean, validate, transform])
modeling_pipeline = pipeline([train, evaluate, save])

# Less maintainable: Everything in one pipeline
huge_pipeline = pipeline([...50 nodes...])

Common Patterns

Pattern: Parameter-Driven Nodes

def train_model(data, params):
    model = Model(
        learning_rate=params["learning_rate"],
        epochs=params["epochs"]
    )
    return model.fit(data)

# Reference parameters from catalog
node(train_model, ["training_data", "params:model_config"], "trained_model")

Pattern: Conditional Processing

def process_if_valid(data, validation_result):
    if validation_result["is_valid"]:
        return process(data)
    return data

node(process_if_valid, ["data", "validation"], "processed")

Pattern: Multiple Pipeline Variants

# Define base pipeline
base_pipeline = pipeline([clean_node, feature_node])

# Create variants
training_pipeline = base_pipeline + pipeline([train_node])
inference_pipeline = base_pipeline + pipeline([predict_node])

Troubleshooting

Issue: OutputNotUniqueError

Problem: Multiple nodes produce the same output

# Error: Both nodes output "result"
node1 = node(func1, "input", "result")
node2 = node(func2, "input", "result")

Solution: Use unique output names

node1 = node(func1, "input", "result_1")
node2 = node(func2, "input", "result_2")

Issue: CircularDependencyError

Problem: Pipeline has circular dependencies

# Error: a → b → c → a
node(func1, "a", "b")
node(func2, "b", "c")
node(func3, "c", "a")

Solution: Break the cycle

node(func1, "a", "b")
node(func2, "b", "c")
node(func3, "c", "d")  # Output to different dataset

See also:

  • Pipeline API Reference - Complete API documentation
  • Runners Guide - Execute pipelines
  • Working with Data Guide - Manage pipeline data