or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

testing-pipelines.mddocs/guides/

Testing Pipelines Guide

Strategies and patterns for testing Kedro pipelines and nodes.

Testing Individual Functions

# src/my_project/pipelines/processing/nodes.py
def clean_data(raw_data):
    """Clean and preprocess raw data."""
    return [x.strip().lower() for x in raw_data]

# tests/pipelines/processing/test_nodes.py
def test_clean_data():
    """Test clean_data function."""
    raw_data = ["  Apple  ", "  Banana  ", "CHERRY"]
    result = clean_data(raw_data)

    assert result == ["apple", "banana", "cherry"]

Testing Nodes

from kedro.pipeline import node

# Create node
clean_node = node(clean_data, "raw", "clean", name="clean")

# Test node execution
def test_clean_node():
    """Test node execution."""
    inputs = {"raw": ["  Apple  ", "  Banana  "]}
    outputs = clean_node.run(inputs)

    assert outputs == {"clean": ["apple", "banana"]}

Testing Pipelines

Test Pipeline Structure

def test_pipeline_structure():
    """Test pipeline has expected nodes."""
    pipe = create_pipeline()

    # Check number of nodes
    assert len(pipe.nodes) == 3

    # Check node names
    node_names = [n.name for n in pipe.nodes]
    assert "clean" in node_names
    assert "transform" in node_names
    assert "analyze" in node_names

Test Pipeline Execution

from kedro.io import DataCatalog, MemoryDataset
from kedro.runner import SequentialRunner

def test_pipeline_execution():
    """Test full pipeline execution."""
    # Create pipeline
    pipe = create_pipeline()

    # Create catalog with test data
    catalog = DataCatalog({
        "raw_data": MemoryDataset(["  Apple  ", "  Banana  "]),
        "cleaned_data": MemoryDataset(),
        "results": MemoryDataset()
    })

    # Run pipeline
    runner = SequentialRunner()
    outputs = runner.run(pipe, catalog)

    # Verify outputs
    results = outputs["results"].load()
    assert results["count"] == 2

Using Pytest Fixtures

import pytest
from kedro.io import DataCatalog, MemoryDataset

@pytest.fixture
def sample_data():
    """Fixture providing sample data."""
    return ["  Apple  ", "  Banana  ", "CHERRY"]

@pytest.fixture
def test_catalog(sample_data):
    """Fixture providing test catalog."""
    return DataCatalog({
        "raw_data": MemoryDataset(sample_data),
        "cleaned_data": MemoryDataset(),
        "results": MemoryDataset()
    })

def test_with_fixtures(test_catalog):
    """Test using fixtures."""
    pipe = create_pipeline()
    runner = SequentialRunner()
    outputs = runner.run(pipe, test_catalog)

    assert "results" in outputs

Mocking Dependencies

from unittest.mock import Mock, patch

def test_with_mock_api():
    """Test node that calls external API."""
    # Mock the API call
    with patch('my_project.nodes.api_client') as mock_api:
        mock_api.fetch_data.return_value = {"data": [1, 2, 3]}

        # Test function
        result = fetch_and_process()

        assert result == [1, 2, 3]
        mock_api.fetch_data.assert_called_once()

Testing with Temporary Files

import pytest
from pathlib import Path

@pytest.fixture
def temp_data_dir(tmp_path):
    """Fixture providing temporary data directory."""
    data_dir = tmp_path / "data"
    data_dir.mkdir()
    return data_dir

def test_file_processing(temp_data_dir):
    """Test processing files."""
    # Create test file
    test_file = temp_data_dir / "test.csv"
    test_file.write_text("col1,col2\n1,2\n3,4")

    # Process file
    result = process_csv(str(test_file))

    assert len(result) == 2

Integration Testing

from kedro.framework.session import KedroSession

def test_full_pipeline_integration(tmp_path):
    """Test complete pipeline in session."""
    with KedroSession.create(
        project_path=tmp_path,
        env="test"
    ) as session:
        # Run pipeline
        outputs = session.run(pipeline_name="data_processing")

        # Verify outputs
        assert "final_output" in outputs

Testing Custom Datasets

import pytest
from my_project.datasets import JSONDataset

@pytest.fixture
def json_file(tmp_path):
    """Fixture providing temporary JSON file."""
    file_path = tmp_path / "test.json"
    return str(file_path)

def test_json_dataset_save_load(json_file):
    """Test JSONDataset save and load."""
    dataset = JSONDataset(json_file)

    # Save data
    test_data = {"key": "value", "number": 42}
    dataset.save(test_data)

    # Load data
    loaded_data = dataset.load()

    assert loaded_data == test_data

def test_json_dataset_exists(json_file):
    """Test JSONDataset exists check."""
    dataset = JSONDataset(json_file)

    # Should not exist initially
    assert not dataset.exists()

    # Should exist after saving
    dataset.save({"test": "data"})
    assert dataset.exists()

Testing Hooks

from my_project.hooks import TimingHooks

def test_timing_hooks():
    """Test timing hooks."""
    hooks = TimingHooks()

    # Create mock node
    mock_node = Mock()
    mock_node.name = "test_node"

    # Call hooks
    hooks.before_node_run(node=mock_node)
    hooks.after_node_run(node=mock_node)

    # Verify timing was recorded
    assert mock_node.name in hooks.node_times

Best Practices

1. Test Functions Independently

# ✅ Good: Test function logic separately
def test_clean_data():
    result = clean_data(["  Test  "])
    assert result == ["test"]

# Then test in pipeline context
def test_clean_node():
    node_result = clean_node.run({"raw": ["  Test  "]})
    assert node_result == {"clean": ["test"]}

2. Use Meaningful Test Data

# ✅ Good: Realistic test data
test_data = [
    "  John Doe  ",
    "JANE SMITH",
    "bob jones"
]

# ❌ Less useful: Generic test data
test_data = ["a", "b", "c"]

3. Test Edge Cases

def test_empty_input():
    """Test handling of empty input."""
    result = clean_data([])
    assert result == []

def test_none_input():
    """Test handling of None input."""
    with pytest.raises(TypeError):
        clean_data(None)

4. Use Parametrized Tests

@pytest.mark.parametrize("input_data,expected", [
    (["  Test  "], ["test"]),
    (["UPPER"], ["upper"]),
    (["  Mixed Case  "], ["mixed case"]),
    ([], [])
])
def test_clean_data_parametrized(input_data, expected):
    """Test clean_data with various inputs."""
    result = clean_data(input_data)
    assert result == expected

5. Test Pipeline Filters

def test_pipeline_filtering():
    """Test pipeline filtering works correctly."""
    full_pipe = create_pipeline()

    # Test tag filtering
    preprocessing = full_pipe.only_nodes_with_tags("preprocessing")
    assert len(preprocessing.nodes) == 2

    # Test range filtering
    from_clean = full_pipe.from_nodes("clean")
    assert "clean" in [n.name for n in from_clean.nodes]

Coverage and Quality

# Run tests with coverage
pytest --cov=my_project tests/

# Generate coverage report
pytest --cov=my_project --cov-report=html tests/

Common Testing Patterns

Pattern: Test Factory

def create_test_catalog(data=None):
    """Factory for creating test catalogs."""
    if data is None:
        data = ["default", "data"]

    return DataCatalog({
        "input": MemoryDataset(data),
        "output": MemoryDataset()
    })

def test_with_factory():
    catalog = create_test_catalog(["custom", "data"])
    # Use catalog for testing

Pattern: Shared Fixtures

# conftest.py
import pytest

@pytest.fixture
def base_catalog():
    """Shared catalog fixture."""
    return DataCatalog({
        "raw": MemoryDataset(),
        "processed": MemoryDataset()
    })

# test_pipeline.py
def test_pipeline_a(base_catalog):
    """Test using shared fixture."""
    pass

# test_nodes.py
def test_node_a(base_catalog):
    """Another test using same fixture."""
    pass

See also:

  • Pipeline API Reference - Pipeline testing targets
  • DataCatalog API - Catalog for testing