tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Patterns and best practices for managing data with Kedro's DataCatalog.
from kedro.io import DataCatalog, MemoryDataset
# Create catalog with datasets
catalog = DataCatalog({
"input_data": MemoryDataset([1, 2, 3, 4, 5]),
"output_data": MemoryDataset()
})# Load data
data = catalog.load("input_data")
# Process data
processed = [x * 2 for x in data]
# Save data
catalog.save("output_data", processed)if catalog.exists("input_data"):
data = catalog.load("input_data")# List all datasets
all_datasets = catalog.filter()
# List with regex filter
params = catalog.filter(name_regex=r"^params:")
models = catalog.filter(name_regex=r".*_model$")from kedro.io import AbstractDataset
import json
class JSONDataset(AbstractDataset):
"""Custom dataset for JSON files."""
def __init__(self, filepath: str):
self._filepath = filepath
def _load(self):
with open(self._filepath, 'r') as f:
return json.load(f)
def _save(self, data):
with open(self._filepath, 'w') as f:
json.dump(data, f, indent=2)
def _describe(self):
return {"filepath": self._filepath, "type": "JSONDataset"}
def _exists(self):
from pathlib import Path
return Path(self._filepath).exists()# Add to catalog
catalog["config"] = JSONDataset("config.json")
# Load and save
config = catalog.load("config")
config["new_key"] = "value"
catalog.save("config", config)# conf/base/catalog.yml
raw_data:
type: pandas.CSVDataset
filepath: data/01_raw/input.csv
processed_data:
type: pandas.ParquetDataset
filepath: data/02_processed/output.parquet
model:
type: pickle.PickleDataset
filepath: data/06_models/model.pkl
versioned: truefrom kedro.io import DataCatalog
# Load from YAML configuration
catalog_config = config_loader["catalog"]
catalog = DataCatalog.from_config(catalog_config)from kedro.io import MemoryDataset
catalog = DataCatalog({
"raw_data": CSVDataset("data/raw.csv"),
"intermediate": MemoryDataset(), # Not persisted
"final_output": ParquetDataset("data/output.parquet")
})# In catalog.yml
model:
type: pickle.PickleDataset
filepath: data/models/model.pkl
versioned: true # Creates timestamped versions
# Load specific version
catalog = DataCatalog.from_config(
catalog_config,
load_versions={"model": "2024-01-15T10.30.45.123Z"}
)
# Save with version
catalog = DataCatalog.from_config(
catalog_config,
save_version="2024-01-15T11.00.00.000Z"
)# conf/base/catalog.yml
database:
type: pandas.SQLTableDataset
credentials: db_credentials
table_name: users
# conf/local/credentials.yml (gitignored)
db_credentials:
con: postgresql://user:password@localhost:5432/dbname# conf/base/catalog.yml
"{namespace}.raw_data":
type: pandas.CSVDataset
filepath: data/01_raw/{namespace}.csv
"{namespace}.processed_data":
type: pandas.ParquetDataset
filepath: data/02_processed/{namespace}.parquetUsage:
# Automatically resolves patterns
catalog.load("customers.raw_data") # Loads from data/01_raw/customers.csv
catalog.load("products.raw_data") # Loads from data/01_raw/products.csv
catalog.save("customers.processed_data", df) # Saves to data/02_processed/customers.parquet# conf/base/parameters.yml
model:
learning_rate: 0.001
epochs: 100
batch_size: 32def train_model(data, params):
model = Model(
learning_rate=params["learning_rate"],
epochs=params["epochs"]
)
return model.fit(data)
# Reference parameters in node
node(
train_model,
inputs=["training_data", "params:model"],
outputs="trained_model"
)from kedro.config import OmegaConfigLoader
loader = OmegaConfigLoader(
conf_source="conf",
runtime_params={
"model.learning_rate": 0.01, # Override specific parameter
"model.epochs": 200
}
)class ValidatedCSVDataset(AbstractDataset):
"""CSV dataset with validation."""
def _load(self):
df = pd.read_csv(self._filepath)
self._validate(df)
return df
def _validate(self, df):
if df.empty:
raise ValueError("Dataset is empty")
if "required_column" not in df.columns:
raise ValueError("Missing required column")def validate_data(data):
"""Validate data before processing."""
if len(data) == 0:
raise ValueError("Empty dataset")
return data
# Add validation node
validation_node = node(validate_data, "raw_data", "validated_data")# Raw data (never modified)
raw_sales:
type: pandas.CSVDataset
filepath: data/01_raw/sales.csv
# Intermediate data (cached)
intermediate_sales:
type: pandas.ParquetDataset
filepath: data/02_intermediate/sales.parquet
# Primary data (clean, validated)
primary_sales:
type: pandas.ParquetDataset
filepath: data/03_primary/sales.parquet
# Feature data
feature_sales:
type: pandas.ParquetDataset
filepath: data/04_feature/sales_features.parquet
# Model inputs
model_input_sales:
type: pandas.ParquetDataset
filepath: data/05_model_input/sales_model_input.parquet
# Models
sales_model:
type: pickle.PickleDataset
filepath: data/06_models/sales_model.pkl
versioned: true
# Model outputs
predictions:
type: pandas.ParquetDataset
filepath: data/07_model_output/predictions.parquet
# Reporting
sales_report:
type: pandas.ExcelDataset
filepath: data/08_reporting/sales_report.xlsxfrom kedro.io import CachedDataset
# Wrap expensive dataset with cache
expensive_dataset = ExpensiveSQLDataset(...)
cached = CachedDataset(expensive_dataset)
catalog["expensive_data"] = cached
# First load: slow (loads from database)
data1 = catalog.load("expensive_data")
# Subsequent loads: fast (loads from cache)
data2 = catalog.load("expensive_data")# Datasets in catalog are lazy-loaded
# They only load when accessed
catalog = DataCatalog({
"large_dataset": ParquetDataset("huge_file.parquet") # Not loaded yet
})
# Load only when needed
if condition:
data = catalog.load("large_dataset") # Loads nowcatalog = DataCatalog({
"database_data": SQLTableDataset(...),
"api_data": APIDataset(...),
"file_data": CSVDataset(...),
"combined_data": MemoryDataset()
})
def combine_sources(db_data, api_data, file_data):
return pd.concat([db_data, api_data, file_data])
node(
combine_sources,
["database_data", "api_data", "file_data"],
"combined_data"
)# Process multiple files as one dataset
partitioned_data:
type: PartitionedDataset
path: data/partitions/
dataset: pandas.CSVDataset# Load all partitions
all_data = catalog.load("partitioned_data") # Dict of dataframes
# Save new partitions
catalog.save("partitioned_data", {"new_partition": df})# Use MemoryDataset for temporary data
catalog = DataCatalog({
"temp_result": MemoryDataset(), # Exists only during pipeline run
"persisted_result": ParquetDataset("output.parquet")
})Transcoding allows you to use the same underlying data in multiple formats without duplicating storage, enabling flexibility for different processing needs and preferences.
Transcoding in Kedro enables a single dataset to be accessed in different formats by different pipeline nodes. This is achieved using the @ separator to specify the desired format.
Format: "dataset_name@format"
Examples:
my_data@pandas - Access my_data as a pandas DataFramemy_data@spark - Access my_data as a Spark DataFrameraw_data@csv - Access raw_data in CSV formatraw_data@parquet - Access raw_data in Parquet formatKey Concept: All transcoded versions (e.g., my_data@pandas, my_data@spark) reference the same underlying data stored at a single location, but loaded/saved using different dataset types.
Transcoding is valuable in several scenarios:
# Load data as CSV, process, save as Parquet
node(
process_data,
inputs="raw_data@csv",
outputs="processed_data@parquet"
)# Team member A prefers pandas
node(analyze_with_pandas, "data@pandas", "analysis_a")
# Team member B prefers Spark
node(analyze_with_spark, "data@spark", "analysis_b")
# Both access the same underlying data# Use pandas for small-scale exploration
node(explore, "data@pandas", "insights")
# Use Spark for large-scale processing
node(process_large_scale, "data@spark", "processed")
# Use Dask for distributed computation
node(distributed_compute, "data@dask", "computed")When you use transcoding in a pipeline, Kedro parses the dataset name at the @ separator and looks up the appropriate configuration in the catalog.
# Input: "my_data@pandas"
# Kedro parses this as:
# - Base dataset name: "my_data"
# - Transcoding format: "pandas"
# - Looks up: catalog configuration for "my_data@pandas"In your catalog, you define each transcoded format separately, all pointing to the same underlying data location:
# conf/base/catalog.yml
# Base dataset (optional, can be just a reference)
my_data:
type: pandas.ParquetDataset
filepath: data/my_data.parquet
# Transcoded version: pandas DataFrame from Parquet file
my_data@pandas:
type: pandas.ParquetDataset
filepath: data/my_data.parquet
# Transcoded version: Spark DataFrame from same Parquet file
my_data@spark:
type: spark.SparkDataset
filepath: data/my_data.parquet
file_format: parquet
# Transcoded version: CSV format
my_data@csv:
type: pandas.CSVDataset
filepath: data/my_data.csvImportant: All transcoded versions should reference the same physical data location (same filepath). The difference is in the dataset type used to read/write the data.
from kedro.pipeline import node, pipeline
def process_with_pandas(data_pandas):
"""Process data using pandas."""
# data_pandas is a pandas DataFrame
return data_pandas.groupby("category").sum()
def process_with_spark(data_spark):
"""Process data using Spark."""
# data_spark is a Spark DataFrame
return data_spark.groupBy("category").sum()
def aggregate_results(pandas_result, spark_result):
"""Combine results from different processing methods."""
return {"pandas": pandas_result, "spark": spark_result}
# Create pipeline with transcoding
processing_pipeline = pipeline([
node(
process_with_pandas,
inputs="raw_data@pandas",
outputs="pandas_processed@pandas",
name="pandas_processing"
),
node(
process_with_spark,
inputs="raw_data@spark",
outputs="spark_processed@spark",
name="spark_processing"
),
node(
aggregate_results,
inputs=["pandas_processed@pandas", "spark_processed@spark"],
outputs="final_results",
name="aggregate"
)
])# conf/base/catalog.yml
# Raw data in multiple formats
raw_data@pandas:
type: pandas.ParquetDataset
filepath: data/01_raw/input.parquet
raw_data@spark:
type: spark.SparkDataset
filepath: data/01_raw/input.parquet
file_format: parquet
raw_data@csv:
type: pandas.CSVDataset
filepath: data/01_raw/input.csv
# Processed data outputs
pandas_processed@pandas:
type: pandas.ParquetDataset
filepath: data/02_processed/pandas_output.parquet
spark_processed@spark:
type: spark.SparkDataset
filepath: data/02_processed/spark_output.parquet
file_format: parquet
# Final results (no transcoding needed)
final_results:
type: pickle.PickleDataset
filepath: data/03_primary/results.pklfrom kedro.pipeline import node, pipeline
# Step 1: Load raw data
def load_csv(csv_data):
"""Load and validate CSV data."""
return csv_data[csv_data["valid"] == True]
# Step 2: Process with pandas for feature engineering
def create_features_pandas(data_pandas):
"""Create features using pandas."""
data_pandas["new_feature"] = data_pandas["col_a"] * data_pandas["col_b"]
return data_pandas
# Step 3: Process large-scale with Spark
def process_large_scale_spark(data_spark):
"""Process large dataset with Spark."""
from pyspark.sql import functions as F
return data_spark.withColumn("processed", F.col("value") * 2)
# Step 4: Train model on pandas data
def train_model(features_pandas, params):
"""Train model on pandas DataFrame."""
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(**params)
return model.fit(features_pandas.drop("target", axis=1), features_pandas["target"])
# Create pipeline
def create_transcoding_pipeline():
return pipeline([
node(
load_csv,
inputs="raw_input@csv",
outputs="validated_data@csv",
name="load_and_validate"
),
node(
create_features_pandas,
inputs="validated_data@pandas",
outputs="features@pandas",
name="feature_engineering"
),
node(
process_large_scale_spark,
inputs="validated_data@spark",
outputs="processed_large@spark",
name="large_scale_processing"
),
node(
train_model,
inputs=["features@pandas", "params:model"],
outputs="trained_model",
name="train_model"
)
])Corresponding catalog configuration:
# conf/base/catalog.yml
# Raw input data
raw_input@csv:
type: pandas.CSVDataset
filepath: data/01_raw/input.csv
load_args:
sep: ","
raw_input@pandas:
type: pandas.ParquetDataset
filepath: data/01_raw/input.parquet
# Validated data - accessible in multiple formats
validated_data@csv:
type: pandas.CSVDataset
filepath: data/02_intermediate/validated.csv
validated_data@pandas:
type: pandas.ParquetDataset
filepath: data/02_intermediate/validated.parquet
validated_data@spark:
type: spark.SparkDataset
filepath: data/02_intermediate/validated.parquet
file_format: parquet
# Features - pandas format
features@pandas:
type: pandas.ParquetDataset
filepath: data/03_primary/features.parquet
# Large scale processed - Spark format
processed_large@spark:
type: spark.SparkDataset
filepath: data/04_feature/processed_large.parquet
file_format: parquet
save_args:
mode: overwrite
# Model output
trained_model:
type: pickle.PickleDataset
filepath: data/06_models/model.pkl
versioned: true# conf/base/parameters.yml
model:
n_estimators: 100
max_depth: 10
random_state: 42def process_mixed(
data_pandas, # Transcoded to pandas
config, # Regular dataset (no transcoding)
params # Parameters (no transcoding)
):
"""Process with mix of transcoded and regular datasets."""
processed = data_pandas * config["multiplier"]
processed = processed ** params["power"]
return processed
node(
process_mixed,
inputs=[
"input_data@pandas", # Transcoded
"config", # Not transcoded
"params:processing" # Not transcoded (parameters)
],
outputs="output_data@parquet"
)# Good: Same filepath for different formats
my_data@pandas:
type: pandas.ParquetDataset
filepath: data/my_data.parquet
my_data@spark:
type: spark.SparkDataset
filepath: data/my_data.parquet # Same file
file_format: parquet
# Avoid: Different file locations
my_data@pandas:
filepath: data/pandas/my_data.parquet
my_data@spark:
filepath: data/spark/my_data.parquet # Different file - not true transcoding# Use pandas for small to medium data and complex operations
node(func, "data@pandas", "output") # Good for < 1GB data
# Use Spark for large-scale distributed processing
node(func, "data@spark", "output") # Good for > 10GB data
# Use CSV for human-readable interchange
node(func, "data@csv", "output") # Good for external sharing# conf/base/catalog.yml
# Customer data - available in multiple formats
# - Use @pandas for feature engineering and model training
# - Use @spark for large-scale aggregations
# - Use @csv for external reporting
customer_data@pandas:
type: pandas.ParquetDataset
filepath: data/customers.parquet
customer_data@spark:
type: spark.SparkDataset
filepath: data/customers.parquet
file_format: parquet
customer_data@csv:
type: pandas.CSVDataset
filepath: data/customers.csv# Good: Transcode only when necessary
node(process_pandas, "data@pandas", "intermediate")
node(process_spark, "intermediate@spark", "output")
# Avoid: Unnecessary back-and-forth transcoding
node(func1, "data@pandas", "temp1@spark")
node(func2, "temp1@spark", "temp2@pandas")
node(func3, "temp2@pandas", "temp3@spark")
# This causes excessive serialization overheadProblem: Dataset not registered in catalog
data = catalog.load("missing_dataset") # ErrorSolution: Add dataset to catalog
catalog["missing_dataset"] = MemoryDataset(data)Problem: Dataset can't be serialized for multiprocessing
Solution: Ensure datasets are picklable or use ThreadRunner
# Use ThreadRunner for non-picklable datasets
from kedro.runner import ThreadRunner
runner = ThreadRunner()Problem: Trying to save to existing version
Solution: Use unique version or allow overwrite
from kedro.io import generate_timestamp
catalog = DataCatalog(
datasets={...},
save_version=generate_timestamp() # Unique version
)See also: