tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Strategies and patterns for testing Kedro pipelines and nodes.
# src/my_project/pipelines/processing/nodes.py
def clean_data(raw_data):
"""Clean and preprocess raw data."""
return [x.strip().lower() for x in raw_data]
# tests/pipelines/processing/test_nodes.py
def test_clean_data():
"""Test clean_data function."""
raw_data = [" Apple ", " Banana ", "CHERRY"]
result = clean_data(raw_data)
assert result == ["apple", "banana", "cherry"]from kedro.pipeline import node
# Create node
clean_node = node(clean_data, "raw", "clean", name="clean")
# Test node execution
def test_clean_node():
"""Test node execution."""
inputs = {"raw": [" Apple ", " Banana "]}
outputs = clean_node.run(inputs)
assert outputs == {"clean": ["apple", "banana"]}def test_pipeline_structure():
"""Test pipeline has expected nodes."""
pipe = create_pipeline()
# Check number of nodes
assert len(pipe.nodes) == 3
# Check node names
node_names = [n.name for n in pipe.nodes]
assert "clean" in node_names
assert "transform" in node_names
assert "analyze" in node_namesfrom kedro.io import DataCatalog, MemoryDataset
from kedro.runner import SequentialRunner
def test_pipeline_execution():
"""Test full pipeline execution."""
# Create pipeline
pipe = create_pipeline()
# Create catalog with test data
catalog = DataCatalog({
"raw_data": MemoryDataset([" Apple ", " Banana "]),
"cleaned_data": MemoryDataset(),
"results": MemoryDataset()
})
# Run pipeline
runner = SequentialRunner()
outputs = runner.run(pipe, catalog)
# Verify outputs
results = outputs["results"].load()
assert results["count"] == 2import pytest
from kedro.io import DataCatalog, MemoryDataset
@pytest.fixture
def sample_data():
"""Fixture providing sample data."""
return [" Apple ", " Banana ", "CHERRY"]
@pytest.fixture
def test_catalog(sample_data):
"""Fixture providing test catalog."""
return DataCatalog({
"raw_data": MemoryDataset(sample_data),
"cleaned_data": MemoryDataset(),
"results": MemoryDataset()
})
def test_with_fixtures(test_catalog):
"""Test using fixtures."""
pipe = create_pipeline()
runner = SequentialRunner()
outputs = runner.run(pipe, test_catalog)
assert "results" in outputsfrom unittest.mock import Mock, patch
def test_with_mock_api():
"""Test node that calls external API."""
# Mock the API call
with patch('my_project.nodes.api_client') as mock_api:
mock_api.fetch_data.return_value = {"data": [1, 2, 3]}
# Test function
result = fetch_and_process()
assert result == [1, 2, 3]
mock_api.fetch_data.assert_called_once()import pytest
from pathlib import Path
@pytest.fixture
def temp_data_dir(tmp_path):
"""Fixture providing temporary data directory."""
data_dir = tmp_path / "data"
data_dir.mkdir()
return data_dir
def test_file_processing(temp_data_dir):
"""Test processing files."""
# Create test file
test_file = temp_data_dir / "test.csv"
test_file.write_text("col1,col2\n1,2\n3,4")
# Process file
result = process_csv(str(test_file))
assert len(result) == 2from kedro.framework.session import KedroSession
def test_full_pipeline_integration(tmp_path):
"""Test complete pipeline in session."""
with KedroSession.create(
project_path=tmp_path,
env="test"
) as session:
# Run pipeline
outputs = session.run(pipeline_name="data_processing")
# Verify outputs
assert "final_output" in outputsimport pytest
from my_project.datasets import JSONDataset
@pytest.fixture
def json_file(tmp_path):
"""Fixture providing temporary JSON file."""
file_path = tmp_path / "test.json"
return str(file_path)
def test_json_dataset_save_load(json_file):
"""Test JSONDataset save and load."""
dataset = JSONDataset(json_file)
# Save data
test_data = {"key": "value", "number": 42}
dataset.save(test_data)
# Load data
loaded_data = dataset.load()
assert loaded_data == test_data
def test_json_dataset_exists(json_file):
"""Test JSONDataset exists check."""
dataset = JSONDataset(json_file)
# Should not exist initially
assert not dataset.exists()
# Should exist after saving
dataset.save({"test": "data"})
assert dataset.exists()from my_project.hooks import TimingHooks
def test_timing_hooks():
"""Test timing hooks."""
hooks = TimingHooks()
# Create mock node
mock_node = Mock()
mock_node.name = "test_node"
# Call hooks
hooks.before_node_run(node=mock_node)
hooks.after_node_run(node=mock_node)
# Verify timing was recorded
assert mock_node.name in hooks.node_times# ✅ Good: Test function logic separately
def test_clean_data():
result = clean_data([" Test "])
assert result == ["test"]
# Then test in pipeline context
def test_clean_node():
node_result = clean_node.run({"raw": [" Test "]})
assert node_result == {"clean": ["test"]}# ✅ Good: Realistic test data
test_data = [
" John Doe ",
"JANE SMITH",
"bob jones"
]
# ❌ Less useful: Generic test data
test_data = ["a", "b", "c"]def test_empty_input():
"""Test handling of empty input."""
result = clean_data([])
assert result == []
def test_none_input():
"""Test handling of None input."""
with pytest.raises(TypeError):
clean_data(None)@pytest.mark.parametrize("input_data,expected", [
([" Test "], ["test"]),
(["UPPER"], ["upper"]),
([" Mixed Case "], ["mixed case"]),
([], [])
])
def test_clean_data_parametrized(input_data, expected):
"""Test clean_data with various inputs."""
result = clean_data(input_data)
assert result == expecteddef test_pipeline_filtering():
"""Test pipeline filtering works correctly."""
full_pipe = create_pipeline()
# Test tag filtering
preprocessing = full_pipe.only_nodes_with_tags("preprocessing")
assert len(preprocessing.nodes) == 2
# Test range filtering
from_clean = full_pipe.from_nodes("clean")
assert "clean" in [n.name for n in from_clean.nodes]# Run tests with coverage
pytest --cov=my_project tests/
# Generate coverage report
pytest --cov=my_project --cov-report=html tests/def create_test_catalog(data=None):
"""Factory for creating test catalogs."""
if data is None:
data = ["default", "data"]
return DataCatalog({
"input": MemoryDataset(data),
"output": MemoryDataset()
})
def test_with_factory():
catalog = create_test_catalog(["custom", "data"])
# Use catalog for testing# conftest.py
import pytest
@pytest.fixture
def base_catalog():
"""Shared catalog fixture."""
return DataCatalog({
"raw": MemoryDataset(),
"processed": MemoryDataset()
})
# test_pipeline.py
def test_pipeline_a(base_catalog):
"""Test using shared fixture."""
pass
# test_nodes.py
def test_node_a(base_catalog):
"""Another test using same fixture."""
passSee also: