0
# Context and Session Management
1
2
Kedro's project context and session management provide the foundation for project lifecycle, configuration access, and execution environment setup. The context encapsulates project settings while sessions manage execution state.
3
4
## Capabilities
5
6
### Kedro Context
7
8
Main context class providing project configuration, catalog access, and pipeline management.
9
10
```python { .api }
11
class KedroContext:
12
"""Main context class for Kedro project management."""
13
14
def __init__(self, package_name, project_path=None, env=None, extra_params=None):
15
"""
16
Initialize Kedro context.
17
18
Args:
19
package_name (str): Name of the Python package
20
project_path (str, optional): Path to project root
21
env (str, optional): Environment name for configuration
22
extra_params (dict, optional): Additional parameters
23
"""
24
25
def run(self, pipeline_name=None, tags=None, runner=None, node_names=None, from_nodes=None, to_nodes=None, from_inputs=None, to_outputs=None, load_versions=None, **kwargs):
26
"""
27
Run pipeline with specified parameters.
28
29
Args:
30
pipeline_name (str, optional): Name of pipeline to run
31
tags (str or list, optional): Tags to filter nodes
32
runner (AbstractRunner, optional): Runner instance to use
33
node_names (str or list, optional): Specific nodes to run
34
from_nodes (str or list, optional): Run from specified nodes
35
to_nodes (str or list, optional): Run to specified nodes
36
from_inputs (str or list, optional): Run from specified inputs
37
to_outputs (str or list, optional): Run to specified outputs
38
load_versions (dict, optional): Dataset versions to load
39
**kwargs: Additional runner arguments
40
41
Returns:
42
dict: Execution results
43
"""
44
45
def validate_settings(self):
46
"""
47
Validate project settings and configuration.
48
49
Raises:
50
KedroContextError: If validation fails
51
"""
52
53
@property
54
def catalog(self):
55
"""
56
Data catalog instance.
57
58
Returns:
59
DataCatalog: Project data catalog
60
"""
61
62
@property
63
def config_loader(self):
64
"""
65
Configuration loader instance.
66
67
Returns:
68
AbstractConfigLoader: Configuration loader
69
"""
70
71
@property
72
def pipelines(self):
73
"""
74
Project pipelines registry.
75
76
Returns:
77
dict: Dictionary of pipeline name to Pipeline mappings
78
"""
79
80
@property
81
def project_path(self):
82
"""
83
Project root path.
84
85
Returns:
86
Path: Project root directory path
87
"""
88
89
class CatalogCommandsMixin:
90
"""Mixin providing catalog-related command functionality."""
91
92
def catalog_list(self, env=None):
93
"""List all datasets in catalog."""
94
95
def catalog_describe(self, env=None):
96
"""Describe catalog configuration."""
97
98
def catalog_create(self, env=None):
99
"""Create missing datasets in catalog."""
100
```
101
102
### Kedro Session
103
104
Session management for project lifecycle and execution environment.
105
106
```python { .api }
107
class KedroSession:
108
"""Manages the lifecycle of a Kedro project run."""
109
110
@classmethod
111
def create(cls, project_path=None, save_on_close=True, env=None, extra_params=None, conf_source=None):
112
"""
113
Create new Kedro session.
114
115
Args:
116
project_path (str, optional): Path to Kedro project
117
save_on_close (bool): Whether to save session on close
118
env (str, optional): Environment name
119
extra_params (dict, optional): Additional parameters
120
conf_source (str, optional): Configuration source path
121
122
Returns:
123
KedroSession: Session instance
124
"""
125
126
def load_context(self):
127
"""
128
Load project context within session.
129
130
Returns:
131
KedroContext: Project context instance
132
"""
133
134
def run(self, pipeline_name=None, tags=None, runner=None, node_names=None, from_nodes=None, to_nodes=None, from_inputs=None, to_outputs=None, load_versions=None, **kwargs):
135
"""
136
Run pipeline within session.
137
138
Args:
139
pipeline_name (str, optional): Pipeline name to run
140
tags (str or list, optional): Node tags to filter
141
runner (AbstractRunner, optional): Runner to use
142
node_names (str or list, optional): Specific nodes to run
143
from_nodes (str or list, optional): Start from specified nodes
144
to_nodes (str or list, optional): End at specified nodes
145
from_inputs (str or list, optional): Start from specified inputs
146
to_outputs (str or list, optional): End at specified outputs
147
load_versions (dict, optional): Dataset versions to load
148
**kwargs: Additional arguments
149
150
Returns:
151
dict: Execution results
152
"""
153
154
def close(self):
155
"""
156
Close session and clean up resources.
157
"""
158
159
def __enter__(self):
160
"""Context manager entry."""
161
return self
162
163
def __exit__(self, exc_type, exc_val, exc_tb):
164
"""Context manager exit with cleanup."""
165
self.close()
166
167
@property
168
def session_id(self):
169
"""Session identifier."""
170
171
@property
172
def store(self):
173
"""Session store for persisting session data."""
174
```
175
176
### Context Utilities
177
178
Utility functions for context composition and management.
179
180
```python { .api }
181
def compose_classes(*base_classes):
182
"""
183
Dynamically compose multiple base classes into a new class.
184
185
Args:
186
*base_classes: Base classes to compose
187
188
Returns:
189
type: Composed class type
190
"""
191
```
192
193
### Context Exceptions
194
195
Exception classes for context-related errors.
196
197
```python { .api }
198
class KedroContextError(Exception):
199
"""Raised for context-related errors."""
200
```
201
202
## Usage Examples
203
204
### Basic Context and Session Usage
205
206
```python
207
from kedro.framework.session import KedroSession
208
from kedro.framework.context import KedroContext
209
210
# Create session and load context
211
with KedroSession.create(project_path="/path/to/project") as session:
212
context = session.load_context()
213
214
# Access catalog
215
catalog = context.catalog
216
print(f"Available datasets: {catalog.keys()}")
217
218
# Access pipelines
219
pipelines = context.pipelines
220
print(f"Available pipelines: {list(pipelines.keys())}")
221
222
# Run pipeline
223
result = session.run(pipeline_name="data_processing")
224
```
225
226
### Environment-Specific Context
227
228
```python
229
from kedro.framework.session import KedroSession
230
231
# Create session for specific environment
232
with KedroSession.create(
233
project_path="/path/to/project",
234
env="production"
235
) as session:
236
context = session.load_context()
237
238
# Configuration automatically loaded for production environment
239
config = context.config_loader.get("database")
240
241
# Run with production settings
242
result = session.run(
243
pipeline_name="ml_pipeline",
244
tags=["training"]
245
)
246
```
247
248
### Custom Context Implementation
249
250
```python
251
from kedro.framework.context import KedroContext
252
from kedro.io import DataCatalog
253
from kedro.config import OmegaConfigLoader
254
255
class CustomKedroContext(KedroContext):
256
"""Custom context with additional functionality."""
257
258
def __init__(self, package_name, project_path=None, env=None, extra_params=None):
259
super().__init__(package_name, project_path, env, extra_params)
260
self._custom_config = self._load_custom_config()
261
262
def _load_custom_config(self):
263
"""Load additional custom configuration."""
264
return {"custom_setting": "value"}
265
266
@property
267
def custom_config(self):
268
"""Access custom configuration."""
269
return self._custom_config
270
271
def validate_settings(self):
272
"""Custom validation logic."""
273
super().validate_settings()
274
# Add custom validation
275
if not self._custom_config:
276
raise ValueError("Custom configuration is required")
277
278
# Use custom context
279
from kedro.framework.session import KedroSession
280
281
class CustomSession(KedroSession):
282
def load_context(self):
283
# Return custom context instead of default
284
return CustomKedroContext(
285
package_name=self._package_name,
286
project_path=self._project_path,
287
env=self._env,
288
extra_params=self._extra_params
289
)
290
291
# Usage
292
with CustomSession.create(project_path="/path/to/project") as session:
293
context = session.load_context()
294
custom_setting = context.custom_config["custom_setting"]
295
```
296
297
### Session with Custom Parameters
298
299
```python
300
from kedro.framework.session import KedroSession
301
302
# Create session with runtime parameters
303
extra_params = {
304
"model_type": "random_forest",
305
"n_estimators": 100,
306
"max_depth": 10
307
}
308
309
with KedroSession.create(
310
project_path="/path/to/project",
311
env="local",
312
extra_params=extra_params
313
) as session:
314
# Parameters available in pipeline as "parameters:model_type", etc.
315
result = session.run(
316
pipeline_name="training_pipeline",
317
runner="ThreadRunner"
318
)
319
```
320
321
### Context Composition
322
323
```python
324
from kedro.framework.context import KedroContext, CatalogCommandsMixin, compose_classes
325
326
# Custom mixins
327
class MetricsMixin:
328
"""Mixin for metrics collection."""
329
330
def collect_metrics(self):
331
"""Collect execution metrics."""
332
return {"nodes_executed": len(self.pipelines)}
333
334
class ValidationMixin:
335
"""Mixin for data validation."""
336
337
def validate_data(self, dataset_name):
338
"""Validate dataset."""
339
data = self.catalog.load(dataset_name)
340
return len(data) > 0
341
342
# Compose context with multiple mixins
343
EnhancedContext = compose_classes(
344
KedroContext,
345
CatalogCommandsMixin,
346
MetricsMixin,
347
ValidationMixin
348
)
349
350
# Create session with enhanced context
351
class EnhancedSession(KedroSession):
352
def load_context(self):
353
return EnhancedContext(
354
package_name=self._package_name,
355
project_path=self._project_path,
356
env=self._env
357
)
358
359
with EnhancedSession.create(project_path="/path/to/project") as session:
360
context = session.load_context()
361
362
# Use catalog commands
363
datasets = context.catalog_list()
364
365
# Use metrics collection
366
metrics = context.collect_metrics()
367
368
# Use data validation
369
is_valid = context.validate_data("input_data")
370
```
371
372
### Programmatic Pipeline Execution
373
374
```python
375
from kedro.framework.session import KedroSession
376
from kedro.runner import ParallelRunner
377
378
def run_pipeline_programmatically(project_path, pipeline_name, env="local"):
379
"""Run pipeline programmatically with custom configuration."""
380
381
with KedroSession.create(
382
project_path=project_path,
383
env=env,
384
save_on_close=True
385
) as session:
386
context = session.load_context()
387
388
# Get pipeline
389
pipeline = context.pipelines[pipeline_name]
390
391
# Filter pipeline if needed
392
filtered_pipeline = pipeline.filter(tags=["preprocessing"])
393
394
# Run with specific runner
395
runner = ParallelRunner(max_workers=2)
396
result = session.run(
397
pipeline_name=None, # Use pipeline object directly
398
runner=runner
399
)
400
401
return result
402
403
# Usage
404
result = run_pipeline_programmatically(
405
project_path="/path/to/project",
406
pipeline_name="data_pipeline",
407
env="production"
408
)
409
```
410
411
### Session Store Usage
412
413
```python
414
from kedro.framework.session import KedroSession
415
416
# Create session with custom store settings
417
with KedroSession.create(
418
project_path="/path/to/project",
419
save_on_close=True
420
) as session:
421
# Access session store
422
store = session.store
423
424
# Store custom session data
425
store["execution_start"] = datetime.now()
426
store["user_id"] = "data_scientist_123"
427
428
# Run pipeline
429
result = session.run("processing_pipeline")
430
431
# Store execution results
432
store["execution_result"] = result
433
store["execution_end"] = datetime.now()
434
435
# Session data automatically saved on close
436
```
437
438
## Types
439
440
```python { .api }
441
from typing import Dict, Any, Optional, List, Union
442
from pathlib import Path
443
444
ProjectPath = Union[str, Path]
445
EnvironmentName = Optional[str]
446
PipelineName = Optional[str]
447
NodeTags = Union[str, List[str]]
448
NodeNames = Union[str, List[str]]
449
ExtraParams = Dict[str, Any]
450
LoadVersions = Dict[str, str]
451
RunResult = Dict[str, Any]
452
SessionId = str
453
```