A comprehensive Python framework for production-ready data science and analytics pipeline development
npx @tessl/cli install tessl/pypi-kedro@1.0.0Kedro is a comprehensive Python framework designed for production-ready data science and analytics pipeline development. It provides software engineering best practices to help create reproducible, maintainable, and modular data engineering and data science pipelines through uniform project templates, data abstraction layers, configuration management, and pipeline assembly tools.
pip install kedroimport kedroCommon patterns for working with Kedro components:
# Configuration management
from kedro.config import AbstractConfigLoader, OmegaConfigLoader
# Data catalog and datasets
from kedro.io import DataCatalog, AbstractDataset, MemoryDataset
# Pipeline construction
from kedro.pipeline import Pipeline, Node, pipeline, node
# Pipeline execution
from kedro.runner import SequentialRunner, ParallelRunner, ThreadRunner
# Framework components
from kedro.framework.context import KedroContext
from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project, pipelines, settingsfrom kedro.pipeline import pipeline, node
from kedro.io import DataCatalog, MemoryDataset
from kedro.runner import SequentialRunner
# Define a simple processing function
def process_data(input_data):
"""Process input data and return results."""
return [x * 2 for x in input_data]
# Create a pipeline node
processing_node = node(
func=process_data,
inputs="raw_data",
outputs="processed_data",
name="process_data_node"
)
# Create a pipeline from nodes
data_pipeline = pipeline([processing_node])
# Set up a data catalog
catalog = DataCatalog({
"raw_data": MemoryDataset([1, 2, 3, 4, 5]),
"processed_data": MemoryDataset()
})
# Run the pipeline
runner = SequentialRunner()
runner.run(data_pipeline, catalog)
# Access results
results = catalog.load("processed_data")
print(results) # [2, 4, 6, 8, 10]Kedro follows a modular architecture built around key abstractions:
This design enables scalable data workflows that follow software engineering principles, supporting everything from local development to production deployment across different compute environments.
Flexible configuration loading supporting multiple formats (YAML, JSON) with environment-specific overrides, parameter management, and extensible loader implementations.
class AbstractConfigLoader:
def load_and_merge_dir_config(self, config_path, env=None, **kwargs): ...
def get(self, *patterns, **kwargs): ...
class OmegaConfigLoader(AbstractConfigLoader):
def __init__(self, conf_source, base_env="base", default_run_env="local", **kwargs): ...Comprehensive data abstraction layer providing consistent interfaces for various data sources, versioning support, lazy loading, and catalog-based dataset management.
class DataCatalog:
def load(self, name): ...
def save(self, name, data): ...
def list(self): ...
def exists(self, name): ...
def add(self, data_set_name, data_set, replace=False): ...
class AbstractDataset:
def load(self): ...
def save(self, data): ...
def exists(self): ...Pipeline definition capabilities including node creation, dependency management, pipeline composition, filtering, and transformation operations.
class Pipeline:
def filter(self, tags=None, from_nodes=None, to_nodes=None, **kwargs): ...
def tag(self, tags): ...
def __add__(self, other): ...
def __or__(self, other): ...
class Node:
def __init__(self, func, inputs, outputs, name=None, tags=None): ...
def node(func, inputs, outputs, name=None, tags=None): ...
def pipeline(pipe, inputs=None, outputs=None, parameters=None, tags=None): ...Multiple execution strategies for running pipelines including sequential, parallel (multiprocessing), and threaded execution with support for partial runs and custom data loading.
class AbstractRunner:
def run(self, pipeline, catalog, hook_manager=None, session_id=None): ...
def run_only_missing(self, pipeline, catalog, hook_manager=None, session_id=None): ...
class SequentialRunner(AbstractRunner): ...
class ParallelRunner(AbstractRunner): ...
class ThreadRunner(AbstractRunner): ...Project lifecycle management including context creation, session handling, configuration access, and environment setup for Kedro applications.
class KedroContext:
def run(self, pipeline_name=None, tags=None, runner=None, **kwargs): ...
@property
def catalog(self): ...
@property
def config_loader(self): ...
class KedroSession:
@classmethod
def create(cls, project_path=None, save_on_close=True, **kwargs): ...
def load_context(self): ...
def run(self, pipeline_name=None, tags=None, runner=None, **kwargs): ...Context and Session Management
Command-line interface for project creation, pipeline execution, and project management with extensible plugin system and project discovery utilities.
def main(): ...
def configure_project(package_name): ...
def find_pipelines(raise_errors=False): ...Plugin architecture enabling custom behavior injection at various lifecycle points including node execution, pipeline runs, and catalog operations.
def hook_impl(func): ...
def _create_hook_manager(): ...Interactive development support with magic commands for reloading projects, debugging nodes, and seamless integration with Jupyter notebooks and IPython environments.
def load_ipython_extension(ipython): ...
def reload_kedro(path=None, env=None, runtime_params=None, **kwargs): ...