or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

datasets

abstract.mdcached.mdmemory.mdshared-memory.mdversioned.md
configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

shared-memory.mddocs/api/datasets/

Shared Memory API Reference

Datasets and catalog for multiprocessing support with ParallelRunner.

SharedMemoryDataset

class SharedMemoryDataset:
    """
    Wrapper for shared MemoryDataset in SyncManager for multiprocessing.
    Enables data sharing across parallel processes.

    Note:
    This is typically used internally by ParallelRunner.
    Direct usage is rare.
    """

    def __init__(self, manager: SyncManager):
        """
        Initialize SharedMemoryDataset.

        Parameters:
        - manager: Multiprocessing SyncManager instance
        """

    def load(self) -> Any:
        """
        Load data from shared memory.

        Returns:
        Data stored in shared memory
        """

    def save(self, data: Any) -> None:
        """
        Save data to shared memory.

        Parameters:
        - data: Data to store in shared memory

        Note:
        Data must be picklable for multiprocessing
        """

SharedMemoryDataCatalog

class SharedMemoryDataCatalog(DataCatalog):
    """
    DataCatalog variant using shared memory for parallel processing.
    Inherits from DataCatalog with shared memory manager integration.
    """

    def set_manager_datasets(self, manager: SyncManager) -> None:
        """
        Associate multiprocessing manager with all shared memory datasets.

        Parameters:
        - manager: Multiprocessing manager instance

        Note:
        Sets the manager for all SharedMemoryDataset instances in catalog.
        Called automatically by ParallelRunner.
        """

    def validate_catalog(self) -> None:
        """
        Validate catalog for multiprocessing compatibility.

        Checks that all datasets are:
        - Picklable (can be serialized)
        - Compatible with multiprocessing

        Raises:
        DatasetError: If datasets are not serializable or incompatible

        Note:
        Datasets marked with _SINGLE_PROCESS = True will raise errors.
        """

Usage Examples

With ParallelRunner (Automatic)

from kedro.runner import ParallelRunner
from kedro.io import DataCatalog, MemoryDataset

# Create normal catalog
catalog = DataCatalog({
    "input": MemoryDataset([1, 2, 3]),
    "output": MemoryDataset()
})

# ParallelRunner automatically uses SharedMemoryDataCatalog
runner = ParallelRunner()
runner.run(pipeline, catalog)  # Handles shared memory internally

Manual SharedMemoryDataCatalog

from multiprocessing.managers import SyncManager
from kedro.io import SharedMemoryDataCatalog, MemoryDataset

# Create manager
manager = SyncManager()
manager.start()

# Create shared memory catalog
catalog = SharedMemoryDataCatalog({
    "data": MemoryDataset([1, 2, 3])
})

# Set manager for shared datasets
catalog.set_manager_datasets(manager)

# Validate multiprocessing compatibility
catalog.validate_catalog()

Validation Example

from kedro.io import SharedMemoryDataCatalog

# This will raise error if non-picklable datasets exist
catalog = SharedMemoryDataCatalog({
    "good_dataset": MemoryDataset([1, 2, 3]),  # Picklable ✓
    # "bad_dataset": DatabaseConnection()      # Not picklable ✗
})

try:
    catalog.validate_catalog()
    print("All datasets are compatible with ParallelRunner")
except DatasetError as e:
    print(f"Incompatible dataset: {e}")

Custom Dataset with Multiprocessing

from kedro.io import AbstractDataset

class MultiprocessingCompatibleDataset(AbstractDataset):
    """Dataset that works with ParallelRunner."""

    _SINGLE_PROCESS = False  # Set to False for multiprocessing support

    def __init__(self, filepath):
        self._filepath = filepath  # Store config, not connections

    def _load(self):
        # Recreate connection in each process
        conn = create_connection(self._filepath)
        return conn.read()

    def _save(self, data):
        # Recreate connection in each process
        conn = create_connection(self._filepath)
        conn.write(data)

Marking Datasets as Single-Process Only

class SingleProcessDataset(AbstractDataset):
    """Dataset that cannot be used with ParallelRunner."""

    _SINGLE_PROCESS = True  # Marks dataset as incompatible

    def __init__(self, connection):
        self._connection = connection  # Non-picklable connection

    def _load(self):
        return self._connection.read()

Multiprocessing Requirements

For datasets to work with ParallelRunner and SharedMemoryDataCatalog:

  1. Picklable: All dataset attributes must be serializable
  2. Stateless: Avoid storing connection objects or file handles
  3. Recreatable: Connections should be created in _load() and _save()

Good Pattern:

class GoodDataset(AbstractDataset):
    def __init__(self, config):
        self._config = config  # Store config (picklable)

    def _load(self):
        conn = create_connection(self._config)  # Create in each process
        return conn.read()

Bad Pattern:

class BadDataset(AbstractDataset):
    def __init__(self, connection):
        self._conn = connection  # Store connection (not picklable)

    def _load(self):
        return self._conn.read()  # Will fail in multiprocessing

See also:

  • ParallelRunner API - Parallel execution
  • AbstractDataset API - Creating custom datasets
  • Parallel Execution Guide - Usage patterns