0
# Data Catalog and Dataset Management
1
2
Kedro's data abstraction layer provides consistent interfaces for working with various data sources through the DataCatalog and dataset implementations. It supports versioning, lazy loading, caching, and multiprocessing-safe operations.
3
4
## Capabilities
5
6
### Data Catalog
7
8
Central registry for managing all datasets in a Kedro project with consistent load/save operations and dataset lifecycle management.
9
10
```python { .api }
11
class DataCatalog:
12
"""Central registry for datasets with consistent load/save interface."""
13
14
def __init__(self, datasets=None, config_resolver=None, load_versions=None, save_version=None):
15
"""
16
Initialize data catalog.
17
18
Args:
19
datasets (dict, optional): Dictionary of dataset name to AbstractDataset mappings
20
config_resolver (CatalogConfigResolver, optional): Config resolver for dynamic datasets
21
load_versions (dict, optional): Dictionary of dataset names to version strings for loading
22
save_version (str, optional): Version string for saving datasets
23
"""
24
25
def load(self, name):
26
"""
27
Load data from dataset.
28
29
Args:
30
name (str): Dataset name
31
32
Returns:
33
Any: Loaded data
34
35
Raises:
36
DatasetNotFoundError: If dataset not found in catalog
37
"""
38
39
def save(self, name, data):
40
"""
41
Save data to dataset.
42
43
Args:
44
name (str): Dataset name
45
data: Data to save
46
47
Raises:
48
DatasetNotFoundError: If dataset not found in catalog
49
"""
50
51
def keys(self):
52
"""
53
List all dataset names in catalog.
54
55
Returns:
56
list: List of dataset names
57
"""
58
59
def exists(self, name):
60
"""
61
Check if dataset exists in catalog.
62
63
Args:
64
name (str): Dataset name
65
66
Returns:
67
bool: True if dataset exists
68
"""
69
70
71
def confirm(self, name):
72
"""
73
Confirm dataset operation was successful.
74
75
Args:
76
name (str): Dataset name
77
"""
78
79
def release(self, name):
80
"""
81
Release dataset resources.
82
83
Args:
84
name (str): Dataset name
85
"""
86
87
class SharedMemoryDataCatalog(DataCatalog):
88
"""Data catalog optimized for multiprocessing with shared memory."""
89
```
90
91
### Abstract Dataset
92
93
Base class for all dataset implementations providing consistent interface for data loading and saving.
94
95
```python { .api }
96
class AbstractDataset:
97
"""Abstract base class for all dataset implementations."""
98
99
def load(self):
100
"""
101
Load data from dataset.
102
103
Returns:
104
Any: Loaded data
105
106
Raises:
107
DatasetError: If loading fails
108
"""
109
110
def save(self, data):
111
"""
112
Save data to dataset.
113
114
Args:
115
data: Data to save
116
117
Raises:
118
DatasetError: If saving fails
119
"""
120
121
def exists(self):
122
"""
123
Check if dataset exists.
124
125
Returns:
126
bool: True if dataset exists
127
"""
128
129
def release(self):
130
"""
131
Release dataset resources.
132
"""
133
134
def describe(self):
135
"""
136
Describe dataset configuration.
137
138
Returns:
139
dict: Dataset description
140
"""
141
142
class AbstractVersionedDataset(AbstractDataset):
143
"""Abstract base class for versioned datasets."""
144
145
def _get_save_path(self):
146
"""Get path for saving versioned data."""
147
148
def _get_load_path(self):
149
"""Get path for loading versioned data."""
150
```
151
152
### Dataset Implementations
153
154
Concrete dataset implementations for various data storage scenarios.
155
156
```python { .api }
157
class MemoryDataset(AbstractDataset):
158
"""In-memory dataset for temporary data storage."""
159
160
def __init__(self, data=None, copy_mode="copy"):
161
"""
162
Initialize memory dataset.
163
164
Args:
165
data: Initial data to store
166
copy_mode (str): Copy mode - "copy", "deepcopy", or "assign"
167
"""
168
169
class CachedDataset(AbstractDataset):
170
"""Wrapper to add caching functionality to any dataset."""
171
172
def __init__(self, dataset, cache_size=None, cache_ttl=None):
173
"""
174
Initialize cached dataset wrapper.
175
176
Args:
177
dataset (AbstractDataset): Dataset to wrap with caching
178
cache_size (int, optional): Maximum cache size
179
cache_ttl (int, optional): Cache time-to-live in seconds
180
"""
181
182
def invalidate_cache(self):
183
"""Invalidate dataset cache."""
184
185
class SharedMemoryDataset(AbstractDataset):
186
"""Dataset optimized for shared memory access in multiprocessing."""
187
188
def __init__(self, data=None, shared_memory_id=None):
189
"""
190
Initialize shared memory dataset.
191
192
Args:
193
data: Initial data to store
194
shared_memory_id (str, optional): Shared memory identifier
195
"""
196
```
197
198
### Dataset Utilities
199
200
Utility classes and functions for dataset management and configuration.
201
202
```python { .api }
203
class CatalogConfigResolver:
204
"""Resolves dataset configurations from catalog YAML files."""
205
206
def list_datasets(self):
207
"""
208
List all datasets from configuration.
209
210
Returns:
211
list: Dataset names
212
"""
213
214
def resolve_credentials(self, config):
215
"""
216
Resolve credentials in dataset configuration.
217
218
Args:
219
config (dict): Dataset configuration
220
221
Returns:
222
dict: Configuration with resolved credentials
223
"""
224
225
def match_catalog_entry(self, dataset_name, config):
226
"""
227
Match dataset name to catalog entry.
228
229
Args:
230
dataset_name (str): Dataset name
231
config (dict): Catalog configuration
232
233
Returns:
234
dict: Matched catalog entry
235
"""
236
237
class Version:
238
"""Represents dataset version for versioned datasets."""
239
240
def __init__(self, load, save):
241
"""
242
Initialize version object.
243
244
Args:
245
load (str): Load version identifier
246
save (str): Save version identifier
247
"""
248
249
@property
250
def load(self):
251
"""Load version identifier."""
252
253
@property
254
def save(self):
255
"""Save version identifier."""
256
```
257
258
### Protocols and Interfaces
259
260
Protocol definitions for catalog implementations and shared memory operations.
261
262
```python { .api }
263
from typing import Protocol
264
265
class CatalogProtocol(Protocol):
266
"""Protocol defining the contract for catalog implementations."""
267
268
def load(self, name: str): ...
269
def save(self, name: str, data): ...
270
def exists(self, name: str) -> bool: ...
271
def list(self) -> list: ...
272
273
class SharedMemoryCatalogProtocol(Protocol):
274
"""Protocol for shared memory catalog implementations."""
275
276
def load(self, name: str): ...
277
def save(self, name: str, data): ...
278
def exists(self, name: str) -> bool: ...
279
```
280
281
### Dataset Exceptions
282
283
Exception classes for dataset-related errors and error handling.
284
285
```python { .api }
286
class DatasetError(Exception):
287
"""Base exception for dataset-related errors."""
288
289
class DatasetNotFoundError(DatasetError):
290
"""Raised when a dataset is not found in the catalog."""
291
292
class DatasetAlreadyExistsError(DatasetError):
293
"""Raised when trying to add a dataset that already exists."""
294
```
295
296
## Usage Examples
297
298
### Basic Catalog Usage
299
300
```python
301
from kedro.io import DataCatalog, MemoryDataset
302
303
# Create datasets
304
raw_data = MemoryDataset([1, 2, 3, 4, 5])
305
processed_data = MemoryDataset()
306
307
# Create catalog
308
catalog = DataCatalog({
309
"raw_data": raw_data,
310
"processed_data": processed_data
311
})
312
313
# Use catalog
314
data = catalog.load("raw_data")
315
catalog.save("processed_data", [x * 2 for x in data])
316
results = catalog.load("processed_data")
317
print(results) # [2, 4, 6, 8, 10]
318
```
319
320
### Working with Versioned Datasets
321
322
```python
323
from kedro.io import AbstractVersionedDataset, Version
324
325
class MyVersionedDataset(AbstractVersionedDataset):
326
def __init__(self, filepath, version=None):
327
super().__init__(filepath, version)
328
self._filepath = filepath
329
330
def _load(self):
331
# Load from versioned path
332
return self._load_from_path(self._get_load_path())
333
334
def _save(self, data):
335
# Save to versioned path
336
self._save_to_path(data, self._get_save_path())
337
338
# Usage with explicit version
339
version = Version(load="2023-01-01T00:00:00.000Z", save=None)
340
dataset = MyVersionedDataset("data/my_data", version=version)
341
```
342
343
### Cached Dataset Wrapper
344
345
```python
346
from kedro.io import CachedDataset, MemoryDataset
347
348
# Wrap any dataset with caching
349
base_dataset = MemoryDataset([1, 2, 3])
350
cached_dataset = CachedDataset(
351
dataset=base_dataset,
352
cache_size=100,
353
cache_ttl=3600 # 1 hour TTL
354
)
355
356
# First load - data loaded from base dataset and cached
357
data1 = cached_dataset.load()
358
359
# Second load - data loaded from cache
360
data2 = cached_dataset.load()
361
362
# Manually invalidate cache if needed
363
cached_dataset.invalidate_cache()
364
```
365
366
### Custom Dataset Implementation
367
368
```python
369
from kedro.io import AbstractDataset
370
import json
371
372
class JSONDataset(AbstractDataset):
373
"""Custom dataset for JSON files."""
374
375
def __init__(self, filepath):
376
self._filepath = filepath
377
378
def _load(self):
379
with open(self._filepath, 'r') as f:
380
return json.load(f)
381
382
def _save(self, data):
383
with open(self._filepath, 'w') as f:
384
json.dump(data, f, indent=2)
385
386
def _exists(self):
387
return Path(self._filepath).exists()
388
389
def _describe(self):
390
return {"filepath": self._filepath, "type": "JSONDataset"}
391
392
# Usage
393
json_dataset = JSONDataset("data/config.json")
394
catalog = DataCatalog({"config": json_dataset})
395
```
396
397
## Types
398
399
```python { .api }
400
from typing import Dict, Any, List, Optional, Union, Protocol
401
402
DatasetName = str
403
DatasetDict = Dict[str, AbstractDataset]
404
DataFeedDict = Dict[str, Any]
405
LayerConfig = Dict[str, List[str]]
406
CopyMode = str # "copy", "deepcopy", or "assign"
407
VersionId = str
408
```