tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Datasets and catalog for multiprocessing support with ParallelRunner.
class SharedMemoryDataset:
"""
Wrapper for shared MemoryDataset in SyncManager for multiprocessing.
Enables data sharing across parallel processes.
Note:
This is typically used internally by ParallelRunner.
Direct usage is rare.
"""
def __init__(self, manager: SyncManager):
"""
Initialize SharedMemoryDataset.
Parameters:
- manager: Multiprocessing SyncManager instance
"""
def load(self) -> Any:
"""
Load data from shared memory.
Returns:
Data stored in shared memory
"""
def save(self, data: Any) -> None:
"""
Save data to shared memory.
Parameters:
- data: Data to store in shared memory
Note:
Data must be picklable for multiprocessing
"""class SharedMemoryDataCatalog(DataCatalog):
"""
DataCatalog variant using shared memory for parallel processing.
Inherits from DataCatalog with shared memory manager integration.
"""
def set_manager_datasets(self, manager: SyncManager) -> None:
"""
Associate multiprocessing manager with all shared memory datasets.
Parameters:
- manager: Multiprocessing manager instance
Note:
Sets the manager for all SharedMemoryDataset instances in catalog.
Called automatically by ParallelRunner.
"""
def validate_catalog(self) -> None:
"""
Validate catalog for multiprocessing compatibility.
Checks that all datasets are:
- Picklable (can be serialized)
- Compatible with multiprocessing
Raises:
DatasetError: If datasets are not serializable or incompatible
Note:
Datasets marked with _SINGLE_PROCESS = True will raise errors.
"""from kedro.runner import ParallelRunner
from kedro.io import DataCatalog, MemoryDataset
# Create normal catalog
catalog = DataCatalog({
"input": MemoryDataset([1, 2, 3]),
"output": MemoryDataset()
})
# ParallelRunner automatically uses SharedMemoryDataCatalog
runner = ParallelRunner()
runner.run(pipeline, catalog) # Handles shared memory internallyfrom multiprocessing.managers import SyncManager
from kedro.io import SharedMemoryDataCatalog, MemoryDataset
# Create manager
manager = SyncManager()
manager.start()
# Create shared memory catalog
catalog = SharedMemoryDataCatalog({
"data": MemoryDataset([1, 2, 3])
})
# Set manager for shared datasets
catalog.set_manager_datasets(manager)
# Validate multiprocessing compatibility
catalog.validate_catalog()from kedro.io import SharedMemoryDataCatalog
# This will raise error if non-picklable datasets exist
catalog = SharedMemoryDataCatalog({
"good_dataset": MemoryDataset([1, 2, 3]), # Picklable ✓
# "bad_dataset": DatabaseConnection() # Not picklable ✗
})
try:
catalog.validate_catalog()
print("All datasets are compatible with ParallelRunner")
except DatasetError as e:
print(f"Incompatible dataset: {e}")from kedro.io import AbstractDataset
class MultiprocessingCompatibleDataset(AbstractDataset):
"""Dataset that works with ParallelRunner."""
_SINGLE_PROCESS = False # Set to False for multiprocessing support
def __init__(self, filepath):
self._filepath = filepath # Store config, not connections
def _load(self):
# Recreate connection in each process
conn = create_connection(self._filepath)
return conn.read()
def _save(self, data):
# Recreate connection in each process
conn = create_connection(self._filepath)
conn.write(data)class SingleProcessDataset(AbstractDataset):
"""Dataset that cannot be used with ParallelRunner."""
_SINGLE_PROCESS = True # Marks dataset as incompatible
def __init__(self, connection):
self._connection = connection # Non-picklable connection
def _load(self):
return self._connection.read()For datasets to work with ParallelRunner and SharedMemoryDataCatalog:
_load() and _save()Good Pattern:
class GoodDataset(AbstractDataset):
def __init__(self, config):
self._config = config # Store config (picklable)
def _load(self):
conn = create_connection(self._config) # Create in each process
return conn.read()Bad Pattern:
class BadDataset(AbstractDataset):
def __init__(self, connection):
self._conn = connection # Store connection (not picklable)
def _load(self):
return self._conn.read() # Will fail in multiprocessingSee also: