0
# Configuration Management
1
2
System configuration, variables, parameters, and connection management for workflow orchestration. Airflow provides multiple ways to manage configuration, secrets, and runtime parameters.
3
4
## Capabilities
5
6
### Variables
7
8
Store and retrieve configuration values that can be accessed across DAGs and tasks.
9
10
```python { .api }
11
class Variable:
12
@classmethod
13
def get(
14
cls,
15
key: str,
16
default_var: Any = None,
17
deserialize_json: bool = False
18
) -> Any:
19
"""
20
Get a variable value.
21
22
Args:
23
key: Variable key
24
default_var: Default value if key not found
25
deserialize_json: Whether to deserialize JSON values
26
27
Returns:
28
Variable value
29
"""
30
31
@classmethod
32
def set(
33
cls,
34
key: str,
35
value: Any,
36
description: Optional[str] = None,
37
serialize_json: bool = False
38
) -> None:
39
"""
40
Set a variable value.
41
42
Args:
43
key: Variable key
44
value: Variable value
45
description: Variable description
46
serialize_json: Whether to serialize value as JSON
47
"""
48
49
@classmethod
50
def delete(cls, key: str) -> None:
51
"""
52
Delete a variable.
53
54
Args:
55
key: Variable key to delete
56
"""
57
58
@classmethod
59
def setdefault(cls, key: str, default: Any) -> Any:
60
"""
61
Set variable if it doesn't exist, otherwise return existing value.
62
63
Args:
64
key: Variable key
65
default: Default value to set
66
67
Returns:
68
Variable value
69
"""
70
```
71
72
Usage example:
73
74
```python
75
from airflow.models import Variable
76
from airflow.decorators import dag, task
77
78
@dag(dag_id='variable_example', start_date=datetime(2024, 1, 1))
79
def variable_example():
80
@task
81
def use_variables():
82
# Get variables
83
api_key = Variable.get("api_key")
84
config = Variable.get("app_config", deserialize_json=True)
85
86
# Set variables
87
Variable.set("last_run", datetime.now().isoformat())
88
89
return {"api_key_length": len(api_key), "config": config}
90
91
use_variables()
92
93
dag_instance = variable_example()
94
```
95
96
### Parameters
97
98
Define typed parameters for DAGs that can be set at runtime.
99
100
```python { .api }
101
class Param:
102
def __init__(
103
self,
104
default: Any = None,
105
description: Optional[str] = None,
106
schema: Optional[Dict[str, Any]] = None,
107
**kwargs
108
):
109
"""
110
Define a DAG parameter.
111
112
Args:
113
default: Default parameter value
114
description: Parameter description
115
schema: JSON schema for validation
116
**kwargs: Additional parameter options
117
"""
118
119
def resolve(self, value: Any = None) -> Any:
120
"""
121
Resolve parameter value with validation.
122
123
Args:
124
value: Input value to resolve
125
126
Returns:
127
Resolved and validated value
128
"""
129
```
130
131
Usage example:
132
133
```python
134
from airflow.models import Param
135
from airflow.decorators import dag, task
136
137
@dag(
138
dag_id='parameterized_dag',
139
start_date=datetime(2024, 1, 1),
140
params={
141
'environment': Param(
142
default='dev',
143
description='Environment to deploy to',
144
schema={'type': 'string', 'enum': ['dev', 'staging', 'prod']}
145
),
146
'batch_size': Param(
147
default=100,
148
description='Number of records to process',
149
schema={'type': 'integer', 'minimum': 1, 'maximum': 1000}
150
),
151
'debug_mode': Param(
152
default=False,
153
description='Enable debug logging',
154
schema={'type': 'boolean'}
155
)
156
}
157
)
158
def parameterized_dag():
159
@task
160
def process_with_params(**context):
161
params = context['params']
162
environment = params['environment']
163
batch_size = params['batch_size']
164
debug_mode = params['debug_mode']
165
166
return f"Processing {batch_size} records in {environment} with debug={debug_mode}"
167
168
process_with_params()
169
170
dag_instance = parameterized_dag()
171
```
172
173
### Connections
174
175
Manage external system connections with credentials and configuration.
176
177
```python { .api }
178
class Connection:
179
def __init__(
180
self,
181
conn_id: str,
182
conn_type: str,
183
description: Optional[str] = None,
184
host: Optional[str] = None,
185
login: Optional[str] = None,
186
password: Optional[str] = None,
187
schema: Optional[str] = None,
188
port: Optional[int] = None,
189
extra: Optional[Union[str, Dict]] = None,
190
uri: Optional[str] = None
191
):
192
"""
193
Create a connection configuration.
194
195
Args:
196
conn_id: Unique connection identifier
197
conn_type: Connection type (postgres, mysql, http, etc.)
198
description: Connection description
199
host: Host address
200
login: Username
201
password: Password
202
schema: Database schema
203
port: Port number
204
extra: Additional connection parameters (JSON string or dict)
205
uri: Complete connection URI
206
"""
207
208
@property
209
def conn_id(self) -> str:
210
"""Connection ID."""
211
212
@property
213
def conn_type(self) -> str:
214
"""Connection type."""
215
216
def get_uri(self) -> str:
217
"""Get complete connection URI."""
218
219
def get_password(self) -> Optional[str]:
220
"""Get connection password."""
221
222
def get_extra(self) -> Dict[str, Any]:
223
"""Get extra connection parameters as dictionary."""
224
225
# Connection retrieval
226
def get_connection(conn_id: str) -> Connection:
227
"""
228
Get connection by ID.
229
230
Args:
231
conn_id: Connection identifier
232
233
Returns:
234
Connection instance
235
"""
236
```
237
238
### Configuration System
239
240
Access and manage Airflow configuration settings.
241
242
```python { .api }
243
from airflow import configuration
244
245
class AirflowConfigParser:
246
def get(
247
self,
248
section: str,
249
key: str,
250
fallback: Any = None
251
) -> str:
252
"""
253
Get configuration value.
254
255
Args:
256
section: Configuration section
257
key: Configuration key
258
fallback: Fallback value
259
260
Returns:
261
Configuration value
262
"""
263
264
def getboolean(
265
self,
266
section: str,
267
key: str,
268
fallback: bool = False
269
) -> bool:
270
"""Get boolean configuration value."""
271
272
def getint(
273
self,
274
section: str,
275
key: str,
276
fallback: int = 0
277
) -> int:
278
"""Get integer configuration value."""
279
280
def getfloat(
281
self,
282
section: str,
283
key: str,
284
fallback: float = 0.0
285
) -> float:
286
"""Get float configuration value."""
287
288
def set(self, section: str, key: str, value: str) -> None:
289
"""Set configuration value."""
290
291
# Global configuration instance
292
conf: AirflowConfigParser
293
```
294
295
Usage example:
296
297
```python
298
from airflow import configuration
299
from airflow.decorators import dag, task
300
301
@dag(dag_id='config_example', start_date=datetime(2024, 1, 1))
302
def config_example():
303
@task
304
def use_config():
305
# Get configuration values
306
webserver_port = configuration.conf.getint('webserver', 'web_server_port')
307
sql_alchemy_conn = configuration.conf.get('database', 'sql_alchemy_conn')
308
parallelism = configuration.conf.getint('core', 'parallelism')
309
310
return {
311
'webserver_port': webserver_port,
312
'db_conn': sql_alchemy_conn[:20] + '...', # Don't log full connection
313
'parallelism': parallelism
314
}
315
316
use_config()
317
318
dag_instance = config_example()
319
```
320
321
### Settings and Environment
322
323
Global settings and environment configuration.
324
325
```python { .api }
326
from airflow import settings
327
328
# Database session
329
Session = settings.Session
330
331
# Configuration paths
332
AIRFLOW_HOME: str
333
DAGS_FOLDER: str
334
PLUGINS_FOLDER: str
335
SQL_ALCHEMY_CONN: str
336
337
def configure_logging() -> None:
338
"""Configure Airflow logging."""
339
340
def initialize_airflow() -> None:
341
"""Initialize Airflow configuration and database."""
342
```
343
344
### Secret Backends
345
346
Integrate with external secret management systems.
347
348
```python { .api }
349
class BaseSecretsBackend:
350
def __init__(self, **kwargs):
351
"""Base class for secret backends."""
352
353
def get_connection(self, conn_id: str) -> Optional[Connection]:
354
"""
355
Get connection from secret backend.
356
357
Args:
358
conn_id: Connection identifier
359
360
Returns:
361
Connection instance or None
362
"""
363
364
def get_variable(self, key: str) -> Optional[str]:
365
"""
366
Get variable from secret backend.
367
368
Args:
369
key: Variable key
370
371
Returns:
372
Variable value or None
373
"""
374
375
# Common secret backends
376
class EnvironmentVariablesBackend(BaseSecretsBackend):
377
"""Use environment variables as secret backend."""
378
379
class LocalFilesystemBackend(BaseSecretsBackend):
380
"""Use local files as secret backend."""
381
```
382
383
### Resource Pools
384
385
Manage resource allocation for task execution.
386
387
```python { .api }
388
class Pool:
389
def __init__(
390
self,
391
pool: str,
392
slots: int,
393
description: str = ""
394
):
395
"""
396
Define a resource pool.
397
398
Args:
399
pool: Pool name
400
slots: Number of available slots
401
description: Pool description
402
"""
403
404
@property
405
def pool(self) -> str:
406
"""Pool name."""
407
408
@property
409
def slots(self) -> int:
410
"""Available slots."""
411
412
@property
413
def description(self) -> str:
414
"""Pool description."""
415
416
@classmethod
417
def get_pool(cls, pool_name: str) -> Optional['Pool']:
418
"""Get pool by name."""
419
420
@classmethod
421
def create_pool(
422
cls,
423
pool_name: str,
424
slots: int,
425
description: str = ""
426
) -> 'Pool':
427
"""Create a new pool."""
428
```
429
430
## Types
431
432
```python { .api }
433
from typing import Union, Optional, Dict, Any
434
from datetime import datetime
435
436
ConfigValue = Union[str, int, float, bool]
437
ParamValue = Union[str, int, float, bool, list, dict]
438
```