or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

index.mddocs/

Kedro

Kedro is a Python framework for building production-ready data and analytics pipelines with structured workflows, data versioning, parallel execution, and modular composition.

Package Information

  • Package Name: kedro
  • Package Type: pypi
  • Version: 1.1.1
  • Language: Python
  • Installation: pip install kedro

Core Imports

# Essential imports for most Kedro projects
from kedro.pipeline import node, pipeline, Pipeline, Node
from kedro.io import DataCatalog, AbstractDataset, MemoryDataset
from kedro.runner import SequentialRunner, ParallelRunner, ThreadRunner
from kedro.config import OmegaConfigLoader
from kedro.framework.session import KedroSession
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_impl

Quick Start

Minimal Working Example

from kedro.pipeline import node, pipeline
from kedro.io import DataCatalog, MemoryDataset
from kedro.runner import SequentialRunner

# Define data processing functions
def clean_data(raw_data):
    return [x.strip().lower() for x in raw_data]

def analyze_data(cleaned_data):
    return {"count": len(cleaned_data), "items": cleaned_data}

# Create nodes
clean_node = node(clean_data, "raw_data", "cleaned_data")
analyze_node = node(analyze_data, "cleaned_data", "results")

# Build pipeline
my_pipeline = pipeline([clean_node, analyze_node])

# Set up data catalog
catalog = DataCatalog({
    "raw_data": MemoryDataset(["  Apple  ", "  Banana  "]),
    "cleaned_data": MemoryDataset(),
    "results": MemoryDataset()
})

# Run pipeline
runner = SequentialRunner()
outputs = runner.run(my_pipeline, catalog)
print(outputs["results"].load())  # {'count': 2, 'items': ['apple', 'banana']}

Core Concepts

Pipelines and Nodes

Nodes wrap functions with defined inputs and outputs:

node(func=process_data, inputs="input_data", outputs="processed_data", name="process")

Pipelines compose nodes into directed acyclic graphs (DAGs):

pipeline([node1, node2, node3])

Data Catalog

Central registry for all data sources and destinations:

catalog = DataCatalog({"dataset_name": MemoryDataset(data)})
data = catalog.load("dataset_name")
catalog.save("output_name", processed_data)

Runners

Execute pipelines in different modes:

  • SequentialRunner() - Run nodes sequentially (debugging, simple workflows)
  • ParallelRunner() - Run nodes in parallel using multiprocessing (CPU-bound)
  • ThreadRunner() - Run nodes in parallel using threading (I/O-bound)

Configuration

Load and merge configuration files:

config_loader = OmegaConfigLoader(conf_source="conf", env="local")
params = config_loader["parameters"]
catalog_config = config_loader["catalog"]

Sessions and Context

Manage project lifecycle:

with KedroSession.create() as session:
    session.run()  # Run default pipeline
    context = session.load_context()  # Access catalog and config

Common Tasks

I Want To...

Create a Simple Pipeline

Creating Pipelines Guide

Quick example:

from kedro.pipeline import node, pipeline

# Define functions
def step1(data): return data * 2
def step2(data): return data + 10

# Create pipeline
my_pipe = pipeline([
    node(step1, "input", "intermediate"),
    node(step2, "intermediate", "output")
])

Work with Data

Working with Data Guide

Quick example:

from kedro.io import DataCatalog, MemoryDataset

catalog = DataCatalog({
    "my_data": MemoryDataset([1, 2, 3, 4, 5])
})

data = catalog.load("my_data")
catalog.save("processed", [x * 2 for x in data])

Run Pipelines in Parallel

Parallel Execution Guide

Quick example:

from kedro.runner import ParallelRunner

runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
# Access data using .load()
result = outputs["my_output"].load()

Manage Configuration

Configuration Management Guide

Quick example:

from kedro.config import OmegaConfigLoader

loader = OmegaConfigLoader(
    conf_source="conf",
    env="prod",
    runtime_params={"model.learning_rate": 0.01}
)
params = loader["parameters"]

Add Custom Behavior with Hooks

Hooks and Plugins Guide

Quick example:

from kedro.framework.hooks import hook_impl

class MyHooks:
    @hook_impl
    def after_node_run(self, node, outputs):
        print(f"Node {node.name} completed")

Work in a Full Kedro Project

Framework Session API

Quick example:

from kedro.framework.session import KedroSession

with KedroSession.create(env="local") as session:
    session.run(pipeline_name="data_processing", tags=["ml"])

API Reference

Complete API documentation organized by component:

Pipeline APIs

Data APIs

Configuration APIs

Framework APIs

Extension APIs

Guides

Step-by-step guides for common workflows:

Reference

Architecture Overview

Kedro's architecture consists of these key layers:

1. Pipeline Layer

Purpose: Define computational workflows as DAGs

  • Nodes: Individual computation steps wrapping functions
  • Pipelines: Compositions of nodes with dependency management
  • Features: Filtering by tags/nodes, namespacing, modular composition

2. Data Layer

Purpose: Abstract data I/O operations

  • DataCatalog: Central registry for all datasets
  • AbstractDataset: Interface for custom dataset implementations
  • Features: Versioning, caching, multiple storage backends

3. Execution Layer

Purpose: Run pipelines with different strategies

  • SequentialRunner: Execute nodes one at a time
  • ParallelRunner: Execute independent nodes in parallel (multiprocessing)
  • ThreadRunner: Execute independent nodes in parallel (threading)

4. Configuration Layer

Purpose: Manage environment-specific settings

  • OmegaConfigLoader: Load and merge YAML/JSON configs
  • Features: Environment overrides, variable interpolation, merge strategies

5. Framework Layer

Purpose: Tie everything together in a project structure

  • KedroSession: Manage pipeline execution lifecycle
  • KedroContext: Provide access to catalog, config, and parameters
  • Features: Project scaffolding, CLI, IPython integration

6. Extension Layer

Purpose: Enable customization and plugins

  • Hooks: Inject behavior at lifecycle events (before/after node/pipeline/dataset operations)
  • CLI Hooks: Extend command-line interface
  • Features: Monitoring, logging, custom workflows

Key Design Patterns

1. Declarative Pipeline Definition

Define what to compute, not how to execute:

pipeline([
    node(clean, "raw", "clean"),
    node(train, "clean", "model"),
    node(evaluate, ["model", "clean"], "metrics")
])

2. Data Abstraction

Access data uniformly regardless of storage:

catalog.load("data")  # Works for CSV, Parquet, S3, databases, etc.

3. Separation of Code and Configuration

Keep logic in Python, configuration in YAML:

# catalog.yml
model_input:
  type: pandas.CSVDataset
  filepath: data/01_raw/input.csv

4. Environment-Based Configuration

Same code, different configs per environment:

conf/
├── base/          # Shared configuration
└── prod/          # Production overrides

5. Composable Pipelines

Build complex workflows from reusable pieces:

preprocessing_pipe = pipeline([...])
training_pipe = pipeline([...])
full_pipe = preprocessing_pipe + training_pipe

6. Hook-Based Extension

Add behavior without modifying core code:

class TimingHooks:
    @hook_impl
    def before_node_run(self, node):
        self.start_time = time.time()

    @hook_impl
    def after_node_run(self, node):
        duration = time.time() - self.start_time
        logger.info(f"{node.name} took {duration:.2f}s")

When to Use Kedro

Good Fits

  • Data pipelines: ETL, data processing, feature engineering
  • ML pipelines: Training, evaluation, deployment workflows
  • Reproducible workflows: Need version control and reproducibility
  • Team projects: Multiple developers, shared code standards
  • Production deployment: Need testing, monitoring, parallel execution

Considerations

  • Learning curve: Kedro adds structure and concepts beyond plain Python
  • Overhead: May be overkill for simple scripts or notebooks
  • Flexibility: Framework opinions vs. free-form code

Common Patterns

Pattern: Multi-Stage Pipeline

# Define stages
ingestion = pipeline([...], namespace="ingestion")
preprocessing = pipeline([...], namespace="preprocessing")
modeling = pipeline([...], namespace="modeling")

# Combine stages
full_pipeline = ingestion + preprocessing + modeling

Pattern: Parameter-Driven Nodes

def train_model(data, params):
    model = Model(
        learning_rate=params["learning_rate"],
        epochs=params["epochs"]
    )
    return model.fit(data)

node(train_model, ["training_data", "params:model"], "trained_model")

Pattern: Incremental Execution

# Only run nodes with missing outputs
runner.run(pipeline, catalog, only_missing_outputs=True)

Pattern: Pipeline Filtering

# Run only preprocessing nodes
session.run(tags=["preprocessing"])

# Run from specific node onwards
session.run(from_nodes=["clean_data"])

# Run up to specific node
session.run(to_nodes=["train_model"])

Pattern: Custom Datasets

from kedro.io import AbstractDataset
import json

class JSONDataset(AbstractDataset):
    def __init__(self, filepath):
        self._filepath = filepath

    def _load(self):
        with open(self._filepath) as f:
            return json.load(f)

    def _save(self, data):
        with open(self._filepath, 'w') as f:
            json.dump(data, f)

Getting Help

Documentation Navigation

  • New to Kedro? Start with Quick Start
  • Specific task? Check Common Tasks above
  • API details? Browse API Reference
  • How-to guides? See Guides

Finding What You Need

By Task:

By Component:

By Complexity:

  • Beginner → Getting Started guides (Quick Start)
  • Intermediate → Task-focused Guides (see Guides section above)
  • Advanced → Complete API Reference (see API Reference section above)