or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

working-with-data.mddocs/guides/

Working with Data Guide

Patterns and best practices for managing data with Kedro's DataCatalog.

Basic Catalog Operations

Creating a Catalog

from kedro.io import DataCatalog, MemoryDataset

# Create catalog with datasets
catalog = DataCatalog({
    "input_data": MemoryDataset([1, 2, 3, 4, 5]),
    "output_data": MemoryDataset()
})

Loading and Saving Data

# Load data
data = catalog.load("input_data")

# Process data
processed = [x * 2 for x in data]

# Save data
catalog.save("output_data", processed)

Checking Dataset Existence

if catalog.exists("input_data"):
    data = catalog.load("input_data")

Listing Datasets

# List all datasets
all_datasets = catalog.filter()

# List with regex filter
params = catalog.filter(name_regex=r"^params:")
models = catalog.filter(name_regex=r".*_model$")

Creating Custom Datasets

Basic Custom Dataset

from kedro.io import AbstractDataset
import json

class JSONDataset(AbstractDataset):
    """Custom dataset for JSON files."""

    def __init__(self, filepath: str):
        self._filepath = filepath

    def _load(self):
        with open(self._filepath, 'r') as f:
            return json.load(f)

    def _save(self, data):
        with open(self._filepath, 'w') as f:
            json.dump(data, f, indent=2)

    def _describe(self):
        return {"filepath": self._filepath, "type": "JSONDataset"}

    def _exists(self):
        from pathlib import Path
        return Path(self._filepath).exists()

Using Custom Dataset

# Add to catalog
catalog["config"] = JSONDataset("config.json")

# Load and save
config = catalog.load("config")
config["new_key"] = "value"
catalog.save("config", config)

Catalog from Configuration

Define Catalog Configuration

# conf/base/catalog.yml
raw_data:
  type: pandas.CSVDataset
  filepath: data/01_raw/input.csv

processed_data:
  type: pandas.ParquetDataset
  filepath: data/02_processed/output.parquet

model:
  type: pickle.PickleDataset
  filepath: data/06_models/model.pkl
  versioned: true

Load Catalog from Config

from kedro.io import DataCatalog

# Load from YAML configuration
catalog_config = config_loader["catalog"]
catalog = DataCatalog.from_config(catalog_config)

Dataset Patterns

Pattern: MemoryDataset for Intermediate Data

from kedro.io import MemoryDataset

catalog = DataCatalog({
    "raw_data": CSVDataset("data/raw.csv"),
    "intermediate": MemoryDataset(),  # Not persisted
    "final_output": ParquetDataset("data/output.parquet")
})

Pattern: Versioned Datasets

# In catalog.yml
model:
  type: pickle.PickleDataset
  filepath: data/models/model.pkl
  versioned: true  # Creates timestamped versions

# Load specific version
catalog = DataCatalog.from_config(
    catalog_config,
    load_versions={"model": "2024-01-15T10.30.45.123Z"}
)

# Save with version
catalog = DataCatalog.from_config(
    catalog_config,
    save_version="2024-01-15T11.00.00.000Z"
)

Pattern: Credentials Management

# conf/base/catalog.yml
database:
  type: pandas.SQLTableDataset
  credentials: db_credentials
  table_name: users

# conf/local/credentials.yml (gitignored)
db_credentials:
  con: postgresql://user:password@localhost:5432/dbname

Pattern: Dataset Factories (Patterns)

# conf/base/catalog.yml
"{namespace}.raw_data":
  type: pandas.CSVDataset
  filepath: data/01_raw/{namespace}.csv

"{namespace}.processed_data":
  type: pandas.ParquetDataset
  filepath: data/02_processed/{namespace}.parquet

Usage:

# Automatically resolves patterns
catalog.load("customers.raw_data")       # Loads from data/01_raw/customers.csv
catalog.load("products.raw_data")        # Loads from data/01_raw/products.csv
catalog.save("customers.processed_data", df)  # Saves to data/02_processed/customers.parquet

Working with Parameters

Storing Parameters

# conf/base/parameters.yml
model:
  learning_rate: 0.001
  epochs: 100
  batch_size: 32

Accessing Parameters in Nodes

def train_model(data, params):
    model = Model(
        learning_rate=params["learning_rate"],
        epochs=params["epochs"]
    )
    return model.fit(data)

# Reference parameters in node
node(
    train_model,
    inputs=["training_data", "params:model"],
    outputs="trained_model"
)

Runtime Parameter Overrides

from kedro.config import OmegaConfigLoader

loader = OmegaConfigLoader(
    conf_source="conf",
    runtime_params={
        "model.learning_rate": 0.01,  # Override specific parameter
        "model.epochs": 200
    }
)

Data Validation

Pattern: Validation in Custom Dataset

class ValidatedCSVDataset(AbstractDataset):
    """CSV dataset with validation."""

    def _load(self):
        df = pd.read_csv(self._filepath)
        self._validate(df)
        return df

    def _validate(self, df):
        if df.empty:
            raise ValueError("Dataset is empty")
        if "required_column" not in df.columns:
            raise ValueError("Missing required column")

Pattern: Validation Nodes

def validate_data(data):
    """Validate data before processing."""
    if len(data) == 0:
        raise ValueError("Empty dataset")
    return data

# Add validation node
validation_node = node(validate_data, "raw_data", "validated_data")

Catalog Patterns for Different Stages

Data Layer Organization

# Raw data (never modified)
raw_sales:
  type: pandas.CSVDataset
  filepath: data/01_raw/sales.csv

# Intermediate data (cached)
intermediate_sales:
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/sales.parquet

# Primary data (clean, validated)
primary_sales:
  type: pandas.ParquetDataset
  filepath: data/03_primary/sales.parquet

# Feature data
feature_sales:
  type: pandas.ParquetDataset
  filepath: data/04_feature/sales_features.parquet

# Model inputs
model_input_sales:
  type: pandas.ParquetDataset
  filepath: data/05_model_input/sales_model_input.parquet

# Models
sales_model:
  type: pickle.PickleDataset
  filepath: data/06_models/sales_model.pkl
  versioned: true

# Model outputs
predictions:
  type: pandas.ParquetDataset
  filepath: data/07_model_output/predictions.parquet

# Reporting
sales_report:
  type: pandas.ExcelDataset
  filepath: data/08_reporting/sales_report.xlsx

Performance Optimization

Pattern: Cached Datasets

from kedro.io import CachedDataset

# Wrap expensive dataset with cache
expensive_dataset = ExpensiveSQLDataset(...)
cached = CachedDataset(expensive_dataset)

catalog["expensive_data"] = cached

# First load: slow (loads from database)
data1 = catalog.load("expensive_data")

# Subsequent loads: fast (loads from cache)
data2 = catalog.load("expensive_data")

Pattern: Lazy Loading

# Datasets in catalog are lazy-loaded
# They only load when accessed
catalog = DataCatalog({
    "large_dataset": ParquetDataset("huge_file.parquet")  # Not loaded yet
})

# Load only when needed
if condition:
    data = catalog.load("large_dataset")  # Loads now

Common Patterns

Pattern: Multiple Data Sources

catalog = DataCatalog({
    "database_data": SQLTableDataset(...),
    "api_data": APIDataset(...),
    "file_data": CSVDataset(...),
    "combined_data": MemoryDataset()
})

def combine_sources(db_data, api_data, file_data):
    return pd.concat([db_data, api_data, file_data])

node(
    combine_sources,
    ["database_data", "api_data", "file_data"],
    "combined_data"
)

Pattern: Partitioned Datasets

# Process multiple files as one dataset
partitioned_data:
  type: PartitionedDataset
  path: data/partitions/
  dataset: pandas.CSVDataset
# Load all partitions
all_data = catalog.load("partitioned_data")  # Dict of dataframes

# Save new partitions
catalog.save("partitioned_data", {"new_partition": df})

Pattern: Temporary Datasets

# Use MemoryDataset for temporary data
catalog = DataCatalog({
    "temp_result": MemoryDataset(),  # Exists only during pipeline run
    "persisted_result": ParquetDataset("output.parquet")
})

Transcoding

Transcoding allows you to use the same underlying data in multiple formats without duplicating storage, enabling flexibility for different processing needs and preferences.

What is Transcoding

Transcoding in Kedro enables a single dataset to be accessed in different formats by different pipeline nodes. This is achieved using the @ separator to specify the desired format.

Format: "dataset_name@format"

Examples:

  • my_data@pandas - Access my_data as a pandas DataFrame
  • my_data@spark - Access my_data as a Spark DataFrame
  • raw_data@csv - Access raw_data in CSV format
  • raw_data@parquet - Access raw_data in Parquet format

Key Concept: All transcoded versions (e.g., my_data@pandas, my_data@spark) reference the same underlying data stored at a single location, but loaded/saved using different dataset types.

When to Use Transcoding

Transcoding is valuable in several scenarios:

1. Loading in One Format, Saving in Another

# Load data as CSV, process, save as Parquet
node(
    process_data,
    inputs="raw_data@csv",
    outputs="processed_data@parquet"
)

2. Supporting Multiple Data Scientists' Format Preferences

# Team member A prefers pandas
node(analyze_with_pandas, "data@pandas", "analysis_a")

# Team member B prefers Spark
node(analyze_with_spark, "data@spark", "analysis_b")

# Both access the same underlying data

3. Optimizing for Different Pipeline Stages

# Use pandas for small-scale exploration
node(explore, "data@pandas", "insights")

# Use Spark for large-scale processing
node(process_large_scale, "data@spark", "processed")

# Use Dask for distributed computation
node(distributed_compute, "data@dask", "computed")

How Transcoding Works

When you use transcoding in a pipeline, Kedro parses the dataset name at the @ separator and looks up the appropriate configuration in the catalog.

Dataset Name Parsing

# Input: "my_data@pandas"
# Kedro parses this as:
# - Base dataset name: "my_data"
# - Transcoding format: "pandas"
# - Looks up: catalog configuration for "my_data@pandas"

Catalog Configuration for Transcoded Datasets

In your catalog, you define each transcoded format separately, all pointing to the same underlying data location:

# conf/base/catalog.yml

# Base dataset (optional, can be just a reference)
my_data:
  type: pandas.ParquetDataset
  filepath: data/my_data.parquet

# Transcoded version: pandas DataFrame from Parquet file
my_data@pandas:
  type: pandas.ParquetDataset
  filepath: data/my_data.parquet

# Transcoded version: Spark DataFrame from same Parquet file
my_data@spark:
  type: spark.SparkDataset
  filepath: data/my_data.parquet
  file_format: parquet

# Transcoded version: CSV format
my_data@csv:
  type: pandas.CSVDataset
  filepath: data/my_data.csv

Important: All transcoded versions should reference the same physical data location (same filepath). The difference is in the dataset type used to read/write the data.

Usage Examples

Node Using Transcoded Inputs/Outputs

from kedro.pipeline import node, pipeline

def process_with_pandas(data_pandas):
    """Process data using pandas."""
    # data_pandas is a pandas DataFrame
    return data_pandas.groupby("category").sum()

def process_with_spark(data_spark):
    """Process data using Spark."""
    # data_spark is a Spark DataFrame
    return data_spark.groupBy("category").sum()

def aggregate_results(pandas_result, spark_result):
    """Combine results from different processing methods."""
    return {"pandas": pandas_result, "spark": spark_result}

# Create pipeline with transcoding
processing_pipeline = pipeline([
    node(
        process_with_pandas,
        inputs="raw_data@pandas",
        outputs="pandas_processed@pandas",
        name="pandas_processing"
    ),
    node(
        process_with_spark,
        inputs="raw_data@spark",
        outputs="spark_processed@spark",
        name="spark_processing"
    ),
    node(
        aggregate_results,
        inputs=["pandas_processed@pandas", "spark_processed@spark"],
        outputs="final_results",
        name="aggregate"
    )
])

Catalog Configuration for Transcoded Datasets

# conf/base/catalog.yml

# Raw data in multiple formats
raw_data@pandas:
  type: pandas.ParquetDataset
  filepath: data/01_raw/input.parquet

raw_data@spark:
  type: spark.SparkDataset
  filepath: data/01_raw/input.parquet
  file_format: parquet

raw_data@csv:
  type: pandas.CSVDataset
  filepath: data/01_raw/input.csv

# Processed data outputs
pandas_processed@pandas:
  type: pandas.ParquetDataset
  filepath: data/02_processed/pandas_output.parquet

spark_processed@spark:
  type: spark.SparkDataset
  filepath: data/02_processed/spark_output.parquet
  file_format: parquet

# Final results (no transcoding needed)
final_results:
  type: pickle.PickleDataset
  filepath: data/03_primary/results.pkl

Complete Pipeline Example with Transcoding

from kedro.pipeline import node, pipeline

# Step 1: Load raw data
def load_csv(csv_data):
    """Load and validate CSV data."""
    return csv_data[csv_data["valid"] == True]

# Step 2: Process with pandas for feature engineering
def create_features_pandas(data_pandas):
    """Create features using pandas."""
    data_pandas["new_feature"] = data_pandas["col_a"] * data_pandas["col_b"]
    return data_pandas

# Step 3: Process large-scale with Spark
def process_large_scale_spark(data_spark):
    """Process large dataset with Spark."""
    from pyspark.sql import functions as F
    return data_spark.withColumn("processed", F.col("value") * 2)

# Step 4: Train model on pandas data
def train_model(features_pandas, params):
    """Train model on pandas DataFrame."""
    from sklearn.ensemble import RandomForestClassifier
    model = RandomForestClassifier(**params)
    return model.fit(features_pandas.drop("target", axis=1), features_pandas["target"])

# Create pipeline
def create_transcoding_pipeline():
    return pipeline([
        node(
            load_csv,
            inputs="raw_input@csv",
            outputs="validated_data@csv",
            name="load_and_validate"
        ),
        node(
            create_features_pandas,
            inputs="validated_data@pandas",
            outputs="features@pandas",
            name="feature_engineering"
        ),
        node(
            process_large_scale_spark,
            inputs="validated_data@spark",
            outputs="processed_large@spark",
            name="large_scale_processing"
        ),
        node(
            train_model,
            inputs=["features@pandas", "params:model"],
            outputs="trained_model",
            name="train_model"
        )
    ])

Corresponding catalog configuration:

# conf/base/catalog.yml

# Raw input data
raw_input@csv:
  type: pandas.CSVDataset
  filepath: data/01_raw/input.csv
  load_args:
    sep: ","

raw_input@pandas:
  type: pandas.ParquetDataset
  filepath: data/01_raw/input.parquet

# Validated data - accessible in multiple formats
validated_data@csv:
  type: pandas.CSVDataset
  filepath: data/02_intermediate/validated.csv

validated_data@pandas:
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/validated.parquet

validated_data@spark:
  type: spark.SparkDataset
  filepath: data/02_intermediate/validated.parquet
  file_format: parquet

# Features - pandas format
features@pandas:
  type: pandas.ParquetDataset
  filepath: data/03_primary/features.parquet

# Large scale processed - Spark format
processed_large@spark:
  type: spark.SparkDataset
  filepath: data/04_feature/processed_large.parquet
  file_format: parquet
  save_args:
    mode: overwrite

# Model output
trained_model:
  type: pickle.PickleDataset
  filepath: data/06_models/model.pkl
  versioned: true
# conf/base/parameters.yml
model:
  n_estimators: 100
  max_depth: 10
  random_state: 42

Mixing Transcoded and Non-Transcoded Datasets

def process_mixed(
    data_pandas,      # Transcoded to pandas
    config,           # Regular dataset (no transcoding)
    params            # Parameters (no transcoding)
):
    """Process with mix of transcoded and regular datasets."""
    processed = data_pandas * config["multiplier"]
    processed = processed ** params["power"]
    return processed

node(
    process_mixed,
    inputs=[
        "input_data@pandas",  # Transcoded
        "config",              # Not transcoded
        "params:processing"    # Not transcoded (parameters)
    ],
    outputs="output_data@parquet"
)

Transcoding Best Practices

1. Use Consistent File Locations

# Good: Same filepath for different formats
my_data@pandas:
  type: pandas.ParquetDataset
  filepath: data/my_data.parquet

my_data@spark:
  type: spark.SparkDataset
  filepath: data/my_data.parquet  # Same file
  file_format: parquet

# Avoid: Different file locations
my_data@pandas:
  filepath: data/pandas/my_data.parquet
my_data@spark:
  filepath: data/spark/my_data.parquet  # Different file - not true transcoding

2. Choose Appropriate Formats

# Use pandas for small to medium data and complex operations
node(func, "data@pandas", "output")  # Good for < 1GB data

# Use Spark for large-scale distributed processing
node(func, "data@spark", "output")   # Good for > 10GB data

# Use CSV for human-readable interchange
node(func, "data@csv", "output")     # Good for external sharing

3. Document Transcoding Usage

# conf/base/catalog.yml

# Customer data - available in multiple formats
# - Use @pandas for feature engineering and model training
# - Use @spark for large-scale aggregations
# - Use @csv for external reporting
customer_data@pandas:
  type: pandas.ParquetDataset
  filepath: data/customers.parquet

customer_data@spark:
  type: spark.SparkDataset
  filepath: data/customers.parquet
  file_format: parquet

customer_data@csv:
  type: pandas.CSVDataset
  filepath: data/customers.csv

4. Avoid Excessive Transcoding

# Good: Transcode only when necessary
node(process_pandas, "data@pandas", "intermediate")
node(process_spark, "intermediate@spark", "output")

# Avoid: Unnecessary back-and-forth transcoding
node(func1, "data@pandas", "temp1@spark")
node(func2, "temp1@spark", "temp2@pandas")
node(func3, "temp2@pandas", "temp3@spark")
# This causes excessive serialization overhead

Troubleshooting

Issue: DatasetNotFoundError

Problem: Dataset not registered in catalog

data = catalog.load("missing_dataset")  # Error

Solution: Add dataset to catalog

catalog["missing_dataset"] = MemoryDataset(data)

Issue: Dataset Not Picklable (ParallelRunner)

Problem: Dataset can't be serialized for multiprocessing

Solution: Ensure datasets are picklable or use ThreadRunner

# Use ThreadRunner for non-picklable datasets
from kedro.runner import ThreadRunner
runner = ThreadRunner()

Issue: Version Conflicts

Problem: Trying to save to existing version

Solution: Use unique version or allow overwrite

from kedro.io import generate_timestamp

catalog = DataCatalog(
    datasets={...},
    save_version=generate_timestamp()  # Unique version
)

See also:

  • DataCatalog API Reference - Complete API documentation
  • AbstractDataset API - Create custom datasets
  • Creating Pipelines Guide - Use data in pipelines