or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assets-scheduling.mdcli-utilities.mdconfiguration.mddag-management.mddatabase-models.mdexceptions.mdexecutors.mdextensions.mdindex.mdtask-operators.mdxcom.md

configuration.mddocs/

0

# Configuration Management

1

2

System configuration, variables, parameters, and connection management for workflow orchestration. Airflow provides multiple ways to manage configuration, secrets, and runtime parameters.

3

4

## Capabilities

5

6

### Variables

7

8

Store and retrieve configuration values that can be accessed across DAGs and tasks.

9

10

```python { .api }

11

class Variable:

12

@classmethod

13

def get(

14

cls,

15

key: str,

16

default_var: Any = None,

17

deserialize_json: bool = False

18

) -> Any:

19

"""

20

Get a variable value.

21

22

Args:

23

key: Variable key

24

default_var: Default value if key not found

25

deserialize_json: Whether to deserialize JSON values

26

27

Returns:

28

Variable value

29

"""

30

31

@classmethod

32

def set(

33

cls,

34

key: str,

35

value: Any,

36

description: Optional[str] = None,

37

serialize_json: bool = False

38

) -> None:

39

"""

40

Set a variable value.

41

42

Args:

43

key: Variable key

44

value: Variable value

45

description: Variable description

46

serialize_json: Whether to serialize value as JSON

47

"""

48

49

@classmethod

50

def delete(cls, key: str) -> None:

51

"""

52

Delete a variable.

53

54

Args:

55

key: Variable key to delete

56

"""

57

58

@classmethod

59

def setdefault(cls, key: str, default: Any) -> Any:

60

"""

61

Set variable if it doesn't exist, otherwise return existing value.

62

63

Args:

64

key: Variable key

65

default: Default value to set

66

67

Returns:

68

Variable value

69

"""

70

```

71

72

Usage example:

73

74

```python

75

from airflow.models import Variable

76

from airflow.decorators import dag, task

77

78

@dag(dag_id='variable_example', start_date=datetime(2024, 1, 1))

79

def variable_example():

80

@task

81

def use_variables():

82

# Get variables

83

api_key = Variable.get("api_key")

84

config = Variable.get("app_config", deserialize_json=True)

85

86

# Set variables

87

Variable.set("last_run", datetime.now().isoformat())

88

89

return {"api_key_length": len(api_key), "config": config}

90

91

use_variables()

92

93

dag_instance = variable_example()

94

```

95

96

### Parameters

97

98

Define typed parameters for DAGs that can be set at runtime.

99

100

```python { .api }

101

class Param:

102

def __init__(

103

self,

104

default: Any = None,

105

description: Optional[str] = None,

106

schema: Optional[Dict[str, Any]] = None,

107

**kwargs

108

):

109

"""

110

Define a DAG parameter.

111

112

Args:

113

default: Default parameter value

114

description: Parameter description

115

schema: JSON schema for validation

116

**kwargs: Additional parameter options

117

"""

118

119

def resolve(self, value: Any = None) -> Any:

120

"""

121

Resolve parameter value with validation.

122

123

Args:

124

value: Input value to resolve

125

126

Returns:

127

Resolved and validated value

128

"""

129

```

130

131

Usage example:

132

133

```python

134

from airflow.models import Param

135

from airflow.decorators import dag, task

136

137

@dag(

138

dag_id='parameterized_dag',

139

start_date=datetime(2024, 1, 1),

140

params={

141

'environment': Param(

142

default='dev',

143

description='Environment to deploy to',

144

schema={'type': 'string', 'enum': ['dev', 'staging', 'prod']}

145

),

146

'batch_size': Param(

147

default=100,

148

description='Number of records to process',

149

schema={'type': 'integer', 'minimum': 1, 'maximum': 1000}

150

),

151

'debug_mode': Param(

152

default=False,

153

description='Enable debug logging',

154

schema={'type': 'boolean'}

155

)

156

}

157

)

158

def parameterized_dag():

159

@task

160

def process_with_params(**context):

161

params = context['params']

162

environment = params['environment']

163

batch_size = params['batch_size']

164

debug_mode = params['debug_mode']

165

166

return f"Processing {batch_size} records in {environment} with debug={debug_mode}"

167

168

process_with_params()

169

170

dag_instance = parameterized_dag()

171

```

172

173

### Connections

174

175

Manage external system connections with credentials and configuration.

176

177

```python { .api }

178

class Connection:

179

def __init__(

180

self,

181

conn_id: str,

182

conn_type: str,

183

description: Optional[str] = None,

184

host: Optional[str] = None,

185

login: Optional[str] = None,

186

password: Optional[str] = None,

187

schema: Optional[str] = None,

188

port: Optional[int] = None,

189

extra: Optional[Union[str, Dict]] = None,

190

uri: Optional[str] = None

191

):

192

"""

193

Create a connection configuration.

194

195

Args:

196

conn_id: Unique connection identifier

197

conn_type: Connection type (postgres, mysql, http, etc.)

198

description: Connection description

199

host: Host address

200

login: Username

201

password: Password

202

schema: Database schema

203

port: Port number

204

extra: Additional connection parameters (JSON string or dict)

205

uri: Complete connection URI

206

"""

207

208

@property

209

def conn_id(self) -> str:

210

"""Connection ID."""

211

212

@property

213

def conn_type(self) -> str:

214

"""Connection type."""

215

216

def get_uri(self) -> str:

217

"""Get complete connection URI."""

218

219

def get_password(self) -> Optional[str]:

220

"""Get connection password."""

221

222

def get_extra(self) -> Dict[str, Any]:

223

"""Get extra connection parameters as dictionary."""

224

225

# Connection retrieval

226

def get_connection(conn_id: str) -> Connection:

227

"""

228

Get connection by ID.

229

230

Args:

231

conn_id: Connection identifier

232

233

Returns:

234

Connection instance

235

"""

236

```

237

238

### Configuration System

239

240

Access and manage Airflow configuration settings.

241

242

```python { .api }

243

from airflow import configuration

244

245

class AirflowConfigParser:

246

def get(

247

self,

248

section: str,

249

key: str,

250

fallback: Any = None

251

) -> str:

252

"""

253

Get configuration value.

254

255

Args:

256

section: Configuration section

257

key: Configuration key

258

fallback: Fallback value

259

260

Returns:

261

Configuration value

262

"""

263

264

def getboolean(

265

self,

266

section: str,

267

key: str,

268

fallback: bool = False

269

) -> bool:

270

"""Get boolean configuration value."""

271

272

def getint(

273

self,

274

section: str,

275

key: str,

276

fallback: int = 0

277

) -> int:

278

"""Get integer configuration value."""

279

280

def getfloat(

281

self,

282

section: str,

283

key: str,

284

fallback: float = 0.0

285

) -> float:

286

"""Get float configuration value."""

287

288

def set(self, section: str, key: str, value: str) -> None:

289

"""Set configuration value."""

290

291

# Global configuration instance

292

conf: AirflowConfigParser

293

```

294

295

Usage example:

296

297

```python

298

from airflow import configuration

299

from airflow.decorators import dag, task

300

301

@dag(dag_id='config_example', start_date=datetime(2024, 1, 1))

302

def config_example():

303

@task

304

def use_config():

305

# Get configuration values

306

webserver_port = configuration.conf.getint('webserver', 'web_server_port')

307

sql_alchemy_conn = configuration.conf.get('database', 'sql_alchemy_conn')

308

parallelism = configuration.conf.getint('core', 'parallelism')

309

310

return {

311

'webserver_port': webserver_port,

312

'db_conn': sql_alchemy_conn[:20] + '...', # Don't log full connection

313

'parallelism': parallelism

314

}

315

316

use_config()

317

318

dag_instance = config_example()

319

```

320

321

### Settings and Environment

322

323

Global settings and environment configuration.

324

325

```python { .api }

326

from airflow import settings

327

328

# Database session

329

Session = settings.Session

330

331

# Configuration paths

332

AIRFLOW_HOME: str

333

DAGS_FOLDER: str

334

PLUGINS_FOLDER: str

335

SQL_ALCHEMY_CONN: str

336

337

def configure_logging() -> None:

338

"""Configure Airflow logging."""

339

340

def initialize_airflow() -> None:

341

"""Initialize Airflow configuration and database."""

342

```

343

344

### Secret Backends

345

346

Integrate with external secret management systems.

347

348

```python { .api }

349

class BaseSecretsBackend:

350

def __init__(self, **kwargs):

351

"""Base class for secret backends."""

352

353

def get_connection(self, conn_id: str) -> Optional[Connection]:

354

"""

355

Get connection from secret backend.

356

357

Args:

358

conn_id: Connection identifier

359

360

Returns:

361

Connection instance or None

362

"""

363

364

def get_variable(self, key: str) -> Optional[str]:

365

"""

366

Get variable from secret backend.

367

368

Args:

369

key: Variable key

370

371

Returns:

372

Variable value or None

373

"""

374

375

# Common secret backends

376

class EnvironmentVariablesBackend(BaseSecretsBackend):

377

"""Use environment variables as secret backend."""

378

379

class LocalFilesystemBackend(BaseSecretsBackend):

380

"""Use local files as secret backend."""

381

```

382

383

### Resource Pools

384

385

Manage resource allocation for task execution.

386

387

```python { .api }

388

class Pool:

389

def __init__(

390

self,

391

pool: str,

392

slots: int,

393

description: str = ""

394

):

395

"""

396

Define a resource pool.

397

398

Args:

399

pool: Pool name

400

slots: Number of available slots

401

description: Pool description

402

"""

403

404

@property

405

def pool(self) -> str:

406

"""Pool name."""

407

408

@property

409

def slots(self) -> int:

410

"""Available slots."""

411

412

@property

413

def description(self) -> str:

414

"""Pool description."""

415

416

@classmethod

417

def get_pool(cls, pool_name: str) -> Optional['Pool']:

418

"""Get pool by name."""

419

420

@classmethod

421

def create_pool(

422

cls,

423

pool_name: str,

424

slots: int,

425

description: str = ""

426

) -> 'Pool':

427

"""Create a new pool."""

428

```

429

430

## Types

431

432

```python { .api }

433

from typing import Union, Optional, Dict, Any

434

from datetime import datetime

435

436

ConfigValue = Union[str, int, float, bool]

437

ParamValue = Union[str, int, float, bool, list, dict]

438

```