or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-project.mdconfiguration.mdcontext-session.mddata-catalog.mdhooks.mdindex.mdipython-integration.mdpipeline-construction.mdpipeline-execution.md

data-catalog.mddocs/

0

# Data Catalog and Dataset Management

1

2

Kedro's data abstraction layer provides consistent interfaces for working with various data sources through the DataCatalog and dataset implementations. It supports versioning, lazy loading, caching, and multiprocessing-safe operations.

3

4

## Capabilities

5

6

### Data Catalog

7

8

Central registry for managing all datasets in a Kedro project with consistent load/save operations and dataset lifecycle management.

9

10

```python { .api }

11

class DataCatalog:

12

"""Central registry for datasets with consistent load/save interface."""

13

14

def __init__(self, datasets=None, config_resolver=None, load_versions=None, save_version=None):

15

"""

16

Initialize data catalog.

17

18

Args:

19

datasets (dict, optional): Dictionary of dataset name to AbstractDataset mappings

20

config_resolver (CatalogConfigResolver, optional): Config resolver for dynamic datasets

21

load_versions (dict, optional): Dictionary of dataset names to version strings for loading

22

save_version (str, optional): Version string for saving datasets

23

"""

24

25

def load(self, name):

26

"""

27

Load data from dataset.

28

29

Args:

30

name (str): Dataset name

31

32

Returns:

33

Any: Loaded data

34

35

Raises:

36

DatasetNotFoundError: If dataset not found in catalog

37

"""

38

39

def save(self, name, data):

40

"""

41

Save data to dataset.

42

43

Args:

44

name (str): Dataset name

45

data: Data to save

46

47

Raises:

48

DatasetNotFoundError: If dataset not found in catalog

49

"""

50

51

def keys(self):

52

"""

53

List all dataset names in catalog.

54

55

Returns:

56

list: List of dataset names

57

"""

58

59

def exists(self, name):

60

"""

61

Check if dataset exists in catalog.

62

63

Args:

64

name (str): Dataset name

65

66

Returns:

67

bool: True if dataset exists

68

"""

69

70

71

def confirm(self, name):

72

"""

73

Confirm dataset operation was successful.

74

75

Args:

76

name (str): Dataset name

77

"""

78

79

def release(self, name):

80

"""

81

Release dataset resources.

82

83

Args:

84

name (str): Dataset name

85

"""

86

87

class SharedMemoryDataCatalog(DataCatalog):

88

"""Data catalog optimized for multiprocessing with shared memory."""

89

```

90

91

### Abstract Dataset

92

93

Base class for all dataset implementations providing consistent interface for data loading and saving.

94

95

```python { .api }

96

class AbstractDataset:

97

"""Abstract base class for all dataset implementations."""

98

99

def load(self):

100

"""

101

Load data from dataset.

102

103

Returns:

104

Any: Loaded data

105

106

Raises:

107

DatasetError: If loading fails

108

"""

109

110

def save(self, data):

111

"""

112

Save data to dataset.

113

114

Args:

115

data: Data to save

116

117

Raises:

118

DatasetError: If saving fails

119

"""

120

121

def exists(self):

122

"""

123

Check if dataset exists.

124

125

Returns:

126

bool: True if dataset exists

127

"""

128

129

def release(self):

130

"""

131

Release dataset resources.

132

"""

133

134

def describe(self):

135

"""

136

Describe dataset configuration.

137

138

Returns:

139

dict: Dataset description

140

"""

141

142

class AbstractVersionedDataset(AbstractDataset):

143

"""Abstract base class for versioned datasets."""

144

145

def _get_save_path(self):

146

"""Get path for saving versioned data."""

147

148

def _get_load_path(self):

149

"""Get path for loading versioned data."""

150

```

151

152

### Dataset Implementations

153

154

Concrete dataset implementations for various data storage scenarios.

155

156

```python { .api }

157

class MemoryDataset(AbstractDataset):

158

"""In-memory dataset for temporary data storage."""

159

160

def __init__(self, data=None, copy_mode="copy"):

161

"""

162

Initialize memory dataset.

163

164

Args:

165

data: Initial data to store

166

copy_mode (str): Copy mode - "copy", "deepcopy", or "assign"

167

"""

168

169

class CachedDataset(AbstractDataset):

170

"""Wrapper to add caching functionality to any dataset."""

171

172

def __init__(self, dataset, cache_size=None, cache_ttl=None):

173

"""

174

Initialize cached dataset wrapper.

175

176

Args:

177

dataset (AbstractDataset): Dataset to wrap with caching

178

cache_size (int, optional): Maximum cache size

179

cache_ttl (int, optional): Cache time-to-live in seconds

180

"""

181

182

def invalidate_cache(self):

183

"""Invalidate dataset cache."""

184

185

class SharedMemoryDataset(AbstractDataset):

186

"""Dataset optimized for shared memory access in multiprocessing."""

187

188

def __init__(self, data=None, shared_memory_id=None):

189

"""

190

Initialize shared memory dataset.

191

192

Args:

193

data: Initial data to store

194

shared_memory_id (str, optional): Shared memory identifier

195

"""

196

```

197

198

### Dataset Utilities

199

200

Utility classes and functions for dataset management and configuration.

201

202

```python { .api }

203

class CatalogConfigResolver:

204

"""Resolves dataset configurations from catalog YAML files."""

205

206

def list_datasets(self):

207

"""

208

List all datasets from configuration.

209

210

Returns:

211

list: Dataset names

212

"""

213

214

def resolve_credentials(self, config):

215

"""

216

Resolve credentials in dataset configuration.

217

218

Args:

219

config (dict): Dataset configuration

220

221

Returns:

222

dict: Configuration with resolved credentials

223

"""

224

225

def match_catalog_entry(self, dataset_name, config):

226

"""

227

Match dataset name to catalog entry.

228

229

Args:

230

dataset_name (str): Dataset name

231

config (dict): Catalog configuration

232

233

Returns:

234

dict: Matched catalog entry

235

"""

236

237

class Version:

238

"""Represents dataset version for versioned datasets."""

239

240

def __init__(self, load, save):

241

"""

242

Initialize version object.

243

244

Args:

245

load (str): Load version identifier

246

save (str): Save version identifier

247

"""

248

249

@property

250

def load(self):

251

"""Load version identifier."""

252

253

@property

254

def save(self):

255

"""Save version identifier."""

256

```

257

258

### Protocols and Interfaces

259

260

Protocol definitions for catalog implementations and shared memory operations.

261

262

```python { .api }

263

from typing import Protocol

264

265

class CatalogProtocol(Protocol):

266

"""Protocol defining the contract for catalog implementations."""

267

268

def load(self, name: str): ...

269

def save(self, name: str, data): ...

270

def exists(self, name: str) -> bool: ...

271

def list(self) -> list: ...

272

273

class SharedMemoryCatalogProtocol(Protocol):

274

"""Protocol for shared memory catalog implementations."""

275

276

def load(self, name: str): ...

277

def save(self, name: str, data): ...

278

def exists(self, name: str) -> bool: ...

279

```

280

281

### Dataset Exceptions

282

283

Exception classes for dataset-related errors and error handling.

284

285

```python { .api }

286

class DatasetError(Exception):

287

"""Base exception for dataset-related errors."""

288

289

class DatasetNotFoundError(DatasetError):

290

"""Raised when a dataset is not found in the catalog."""

291

292

class DatasetAlreadyExistsError(DatasetError):

293

"""Raised when trying to add a dataset that already exists."""

294

```

295

296

## Usage Examples

297

298

### Basic Catalog Usage

299

300

```python

301

from kedro.io import DataCatalog, MemoryDataset

302

303

# Create datasets

304

raw_data = MemoryDataset([1, 2, 3, 4, 5])

305

processed_data = MemoryDataset()

306

307

# Create catalog

308

catalog = DataCatalog({

309

"raw_data": raw_data,

310

"processed_data": processed_data

311

})

312

313

# Use catalog

314

data = catalog.load("raw_data")

315

catalog.save("processed_data", [x * 2 for x in data])

316

results = catalog.load("processed_data")

317

print(results) # [2, 4, 6, 8, 10]

318

```

319

320

### Working with Versioned Datasets

321

322

```python

323

from kedro.io import AbstractVersionedDataset, Version

324

325

class MyVersionedDataset(AbstractVersionedDataset):

326

def __init__(self, filepath, version=None):

327

super().__init__(filepath, version)

328

self._filepath = filepath

329

330

def _load(self):

331

# Load from versioned path

332

return self._load_from_path(self._get_load_path())

333

334

def _save(self, data):

335

# Save to versioned path

336

self._save_to_path(data, self._get_save_path())

337

338

# Usage with explicit version

339

version = Version(load="2023-01-01T00:00:00.000Z", save=None)

340

dataset = MyVersionedDataset("data/my_data", version=version)

341

```

342

343

### Cached Dataset Wrapper

344

345

```python

346

from kedro.io import CachedDataset, MemoryDataset

347

348

# Wrap any dataset with caching

349

base_dataset = MemoryDataset([1, 2, 3])

350

cached_dataset = CachedDataset(

351

dataset=base_dataset,

352

cache_size=100,

353

cache_ttl=3600 # 1 hour TTL

354

)

355

356

# First load - data loaded from base dataset and cached

357

data1 = cached_dataset.load()

358

359

# Second load - data loaded from cache

360

data2 = cached_dataset.load()

361

362

# Manually invalidate cache if needed

363

cached_dataset.invalidate_cache()

364

```

365

366

### Custom Dataset Implementation

367

368

```python

369

from kedro.io import AbstractDataset

370

import json

371

372

class JSONDataset(AbstractDataset):

373

"""Custom dataset for JSON files."""

374

375

def __init__(self, filepath):

376

self._filepath = filepath

377

378

def _load(self):

379

with open(self._filepath, 'r') as f:

380

return json.load(f)

381

382

def _save(self, data):

383

with open(self._filepath, 'w') as f:

384

json.dump(data, f, indent=2)

385

386

def _exists(self):

387

return Path(self._filepath).exists()

388

389

def _describe(self):

390

return {"filepath": self._filepath, "type": "JSONDataset"}

391

392

# Usage

393

json_dataset = JSONDataset("data/config.json")

394

catalog = DataCatalog({"config": json_dataset})

395

```

396

397

## Types

398

399

```python { .api }

400

from typing import Dict, Any, List, Optional, Union, Protocol

401

402

DatasetName = str

403

DatasetDict = Dict[str, AbstractDataset]

404

DataFeedDict = Dict[str, Any]

405

LayerConfig = Dict[str, List[str]]

406

CopyMode = str # "copy", "deepcopy", or "assign"

407

VersionId = str

408

```