tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Kedro is a Python framework for building production-ready data and analytics pipelines with structured workflows, data versioning, parallel execution, and modular composition.
pip install kedro# Essential imports for most Kedro projects
from kedro.pipeline import node, pipeline, Pipeline, Node
from kedro.io import DataCatalog, AbstractDataset, MemoryDataset
from kedro.runner import SequentialRunner, ParallelRunner, ThreadRunner
from kedro.config import OmegaConfigLoader
from kedro.framework.session import KedroSession
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_implfrom kedro.pipeline import node, pipeline
from kedro.io import DataCatalog, MemoryDataset
from kedro.runner import SequentialRunner
# Define data processing functions
def clean_data(raw_data):
return [x.strip().lower() for x in raw_data]
def analyze_data(cleaned_data):
return {"count": len(cleaned_data), "items": cleaned_data}
# Create nodes
clean_node = node(clean_data, "raw_data", "cleaned_data")
analyze_node = node(analyze_data, "cleaned_data", "results")
# Build pipeline
my_pipeline = pipeline([clean_node, analyze_node])
# Set up data catalog
catalog = DataCatalog({
"raw_data": MemoryDataset([" Apple ", " Banana "]),
"cleaned_data": MemoryDataset(),
"results": MemoryDataset()
})
# Run pipeline
runner = SequentialRunner()
outputs = runner.run(my_pipeline, catalog)
print(outputs["results"].load()) # {'count': 2, 'items': ['apple', 'banana']}Nodes wrap functions with defined inputs and outputs:
node(func=process_data, inputs="input_data", outputs="processed_data", name="process")Pipelines compose nodes into directed acyclic graphs (DAGs):
pipeline([node1, node2, node3])Central registry for all data sources and destinations:
catalog = DataCatalog({"dataset_name": MemoryDataset(data)})
data = catalog.load("dataset_name")
catalog.save("output_name", processed_data)Execute pipelines in different modes:
SequentialRunner() - Run nodes sequentially (debugging, simple workflows)ParallelRunner() - Run nodes in parallel using multiprocessing (CPU-bound)ThreadRunner() - Run nodes in parallel using threading (I/O-bound)Load and merge configuration files:
config_loader = OmegaConfigLoader(conf_source="conf", env="local")
params = config_loader["parameters"]
catalog_config = config_loader["catalog"]Manage project lifecycle:
with KedroSession.create() as session:
session.run() # Run default pipeline
context = session.load_context() # Access catalog and configQuick example:
from kedro.pipeline import node, pipeline
# Define functions
def step1(data): return data * 2
def step2(data): return data + 10
# Create pipeline
my_pipe = pipeline([
node(step1, "input", "intermediate"),
node(step2, "intermediate", "output")
])Quick example:
from kedro.io import DataCatalog, MemoryDataset
catalog = DataCatalog({
"my_data": MemoryDataset([1, 2, 3, 4, 5])
})
data = catalog.load("my_data")
catalog.save("processed", [x * 2 for x in data])Quick example:
from kedro.runner import ParallelRunner
runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
# Access data using .load()
result = outputs["my_output"].load()→ Configuration Management Guide
Quick example:
from kedro.config import OmegaConfigLoader
loader = OmegaConfigLoader(
conf_source="conf",
env="prod",
runtime_params={"model.learning_rate": 0.01}
)
params = loader["parameters"]Quick example:
from kedro.framework.hooks import hook_impl
class MyHooks:
@hook_impl
def after_node_run(self, node, outputs):
print(f"Node {node.name} completed")Quick example:
from kedro.framework.session import KedroSession
with KedroSession.create(env="local") as session:
session.run(pipeline_name="data_processing", tags=["ml"])Complete API documentation organized by component:
Step-by-step guides for common workflows:
Kedro's architecture consists of these key layers:
Purpose: Define computational workflows as DAGs
Purpose: Abstract data I/O operations
Purpose: Run pipelines with different strategies
Purpose: Manage environment-specific settings
Purpose: Tie everything together in a project structure
Purpose: Enable customization and plugins
Define what to compute, not how to execute:
pipeline([
node(clean, "raw", "clean"),
node(train, "clean", "model"),
node(evaluate, ["model", "clean"], "metrics")
])Access data uniformly regardless of storage:
catalog.load("data") # Works for CSV, Parquet, S3, databases, etc.Keep logic in Python, configuration in YAML:
# catalog.yml
model_input:
type: pandas.CSVDataset
filepath: data/01_raw/input.csvSame code, different configs per environment:
conf/
├── base/ # Shared configuration
└── prod/ # Production overridesBuild complex workflows from reusable pieces:
preprocessing_pipe = pipeline([...])
training_pipe = pipeline([...])
full_pipe = preprocessing_pipe + training_pipeAdd behavior without modifying core code:
class TimingHooks:
@hook_impl
def before_node_run(self, node):
self.start_time = time.time()
@hook_impl
def after_node_run(self, node):
duration = time.time() - self.start_time
logger.info(f"{node.name} took {duration:.2f}s")# Define stages
ingestion = pipeline([...], namespace="ingestion")
preprocessing = pipeline([...], namespace="preprocessing")
modeling = pipeline([...], namespace="modeling")
# Combine stages
full_pipeline = ingestion + preprocessing + modelingdef train_model(data, params):
model = Model(
learning_rate=params["learning_rate"],
epochs=params["epochs"]
)
return model.fit(data)
node(train_model, ["training_data", "params:model"], "trained_model")# Only run nodes with missing outputs
runner.run(pipeline, catalog, only_missing_outputs=True)# Run only preprocessing nodes
session.run(tags=["preprocessing"])
# Run from specific node onwards
session.run(from_nodes=["clean_data"])
# Run up to specific node
session.run(to_nodes=["train_model"])from kedro.io import AbstractDataset
import json
class JSONDataset(AbstractDataset):
def __init__(self, filepath):
self._filepath = filepath
def _load(self):
with open(self._filepath) as f:
return json.load(f)
def _save(self, data):
with open(self._filepath, 'w') as f:
json.dump(data, f)By Task:
By Component:
By Complexity: