tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Step-by-step guide for building, composing, and filtering Kedro pipelines.
def clean_data(raw_data):
"""Clean and preprocess raw data."""
return [x.strip().lower() for x in raw_data]
def create_features(cleaned_data):
"""Create features from cleaned data."""
return [{"text": x, "length": len(x)} for x in cleaned_data]
def train_model(features, parameters):
"""Train a model on features."""
model = {"type": "simple", "data_count": len(features)}
return modelfrom kedro.pipeline import node
# Create nodes wrapping the functions
clean_node = node(
func=clean_data,
inputs="raw_data",
outputs="cleaned_data",
name="clean"
)
feature_node = node(
func=create_features,
inputs="cleaned_data",
outputs="features",
name="create_features"
)
train_node = node(
func=train_model,
inputs=["features", "params:model_config"],
outputs="trained_model",
name="train"
)from kedro.pipeline import pipeline
ml_pipeline = pipeline([
clean_node,
feature_node,
train_node
])node(func=process, inputs="input_data", outputs="output_data")def combine(data_a, data_b):
return data_a + data_b
node(func=combine, inputs=["data_a", "data_b"], outputs="combined")def filter_data(data, threshold=0.5):
return [x for x in data if x > threshold]
node(
func=filter_data,
inputs={"data": "raw_data", "threshold": "params:threshold"},
outputs="filtered_data"
)def split_data(data, ratio=0.8):
split_idx = int(len(data) * ratio)
return data[:split_idx], data[split_idx:]
node(
func=split_data,
inputs=["full_dataset", "params:split_ratio"],
outputs=["train_data", "test_data"]
)# Side-effect node (e.g., logging, notifications)
def log_completion():
print("Pipeline completed!")
node(func=log_completion, inputs=None, outputs=None, name="log")# Define separate pipelines
preprocessing = pipeline([clean_node, feature_node])
modeling = pipeline([train_node, evaluate_node])
# Combine pipelines
full_pipeline = preprocessing + modeling# Make specific datasets external
data_pipeline = pipeline(
[clean_node, feature_node, train_node],
inputs="raw_data",
outputs="trained_model"
)# Add namespace to all nodes
preprocessing = pipeline(
[clean_node, feature_node],
namespace="preprocessing"
)
modeling = pipeline(
[train_node],
namespace="modeling"
)
# Combine namespaced pipelines
full_pipeline = preprocessing + modeling# Add tags to all nodes
ml_pipeline = pipeline(
[train_node, evaluate_node],
tags=["ml", "training"]
)# Run only preprocessing nodes
preprocessing_only = full_pipeline.only_nodes_with_tags("preprocessing")# Run only specific nodes
specific_nodes = full_pipeline.only_nodes("clean", "train")# Run from specific input
from_clean = full_pipeline.from_inputs("cleaned_data")
# Run to specific output
to_model = full_pipeline.to_outputs("trained_model")# Run from node onwards (inclusive)
from_clean_node = full_pipeline.from_nodes("clean")
# Run up to node (inclusive)
to_train_node = full_pipeline.to_nodes("train")
# Run specific range
clean_to_train = full_pipeline.from_nodes("clean").to_nodes("train")# Run only nodes in specific namespace
preprocessing_only = full_pipeline.only_nodes_with_namespaces(["preprocessing"])# Combine multiple filters
filtered = full_pipeline.filter(
tags=["ml"],
from_inputs=["features"],
to_outputs=["metrics"]
)Create reusable pipeline modules:
# Define a reusable data preprocessing module
def create_preprocessing_pipeline():
return pipeline([
node(load_raw_data, None, "raw_data"),
node(clean_data, "raw_data", "cleaned_data"),
node(validate_data, "cleaned_data", "validated_data")
])
# Define a model training module
def create_training_pipeline():
return pipeline([
node(create_features, "validated_data", "features"),
node(train_model, ["features", "params:model"], "model"),
node(evaluate_model, ["model", "features"], "metrics")
])
# Compose modules
def create_full_pipeline():
preprocessing = create_preprocessing_pipeline()
training = create_training_pipeline()
return preprocessing + trainingOrganize complex workflows into stages:
# Define stages
ingestion = pipeline([...], namespace="ingestion", tags=["stage_1"])
preprocessing = pipeline([...], namespace="preprocessing", tags=["stage_2"])
feature_eng = pipeline([...], namespace="feature_engineering", tags=["stage_3"])
modeling = pipeline([...], namespace="modeling", tags=["stage_4"])
# Combine all stages
full_pipeline = ingestion + preprocessing + feature_eng + modeling
# Run specific stages
session.run(tags=["stage_1", "stage_2"]) # Run only ingestion and preprocessing# Number of nodes
print(f"Total nodes: {len(pipeline.nodes)}")
# External inputs (not produced by any node)
print(f"External inputs: {pipeline.inputs()}")
# External outputs (not consumed by any node)
print(f"External outputs: {pipeline.outputs()}")
# All datasets
print(f"All datasets: {pipeline.datasets()}")# Human-readable description
print(pipeline.describe())
# Node dependencies
deps = pipeline.node_dependencies
for node, dependencies in deps.items():
print(f"{node.name} depends on: {[n.name for n in dependencies]}")# Get nodes that can run in parallel
for group in pipeline.grouped_nodes:
print(f"Parallel group: {[n.name for n in group]}")# Good: Pure function
def clean_data(raw_data):
return [x.strip().lower() for x in raw_data]
# Avoid: Side effects or state modification
def clean_data(raw_data):
global cleaned_count # Avoid global state
cleaned_count += 1
raw_data.clear() # Avoid modifying inputs
return processed# Good: Clear names
node(clean_data, "raw", "clean", name="clean_customer_data")
# Less clear
node(clean_data, "raw", "clean", name="process")# Tag nodes by stage, environment, or purpose
node(clean_data, "raw", "clean", tags=["preprocessing", "data_quality"])
node(train_model, "features", "model", tags=["ml", "training", "production"])# Create isolated, reusable modules
data_ingestion = pipeline([...], namespace="ingestion")
data_preprocessing = pipeline([...], namespace="preprocessing")# Good: Separate concerns
preprocessing_pipeline = pipeline([clean, validate, transform])
modeling_pipeline = pipeline([train, evaluate, save])
# Less maintainable: Everything in one pipeline
huge_pipeline = pipeline([...50 nodes...])def train_model(data, params):
model = Model(
learning_rate=params["learning_rate"],
epochs=params["epochs"]
)
return model.fit(data)
# Reference parameters from catalog
node(train_model, ["training_data", "params:model_config"], "trained_model")def process_if_valid(data, validation_result):
if validation_result["is_valid"]:
return process(data)
return data
node(process_if_valid, ["data", "validation"], "processed")# Define base pipeline
base_pipeline = pipeline([clean_node, feature_node])
# Create variants
training_pipeline = base_pipeline + pipeline([train_node])
inference_pipeline = base_pipeline + pipeline([predict_node])Problem: Multiple nodes produce the same output
# Error: Both nodes output "result"
node1 = node(func1, "input", "result")
node2 = node(func2, "input", "result")Solution: Use unique output names
node1 = node(func1, "input", "result_1")
node2 = node(func2, "input", "result_2")Problem: Pipeline has circular dependencies
# Error: a → b → c → a
node(func1, "a", "b")
node(func2, "b", "c")
node(func3, "c", "a")Solution: Break the cycle
node(func1, "a", "b")
node(func2, "b", "c")
node(func3, "c", "d") # Output to different datasetSee also: