or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

autogeneration.mdcli-commands.mdconfiguration.mdindex.mdmigration-context.mdmigration-operations.mdruntime.mdscript-management.md

migration-context.mddocs/

0

# Migration Context

1

2

Environment context functions for accessing database connections, configuration, and migration state within migration scripts through the `alembic.context` module. These functions are primarily used in the `env.py` file and migration scripts.

3

4

## Core Imports

5

6

```python

7

from alembic import context

8

```

9

10

## Capabilities

11

12

### Context Configuration

13

14

Configure the migration environment with database connections and settings.

15

16

```python { .api }

17

def configure(connection=None, url=None, dialect_name=None, dialect_opts=None, starting_rev=None, tag=None, template_args=None, render_as_batch=None, target_metadata=None, include_name=None, include_object=None, include_schemas=None, process_revision_directives=None, compare_type=None, compare_server_default=None, render_item=None, literal_binds=None, upgrade_token=None, downgrade_token=None, alembic_module_prefix=None, sqlalchemy_module_prefix=None, user_module_prefix=None, **kw):

18

"""

19

Configure the migration context.

20

21

Args:

22

connection (Connection): Database connection to use

23

url (str): Database URL if connection not provided

24

dialect_name (str): Name of dialect to use

25

dialect_opts (dict): Dialect-specific options

26

starting_rev (str): Starting revision for migration

27

tag (str): Tag to apply to migration

28

template_args (dict): Template variables

29

render_as_batch (bool): Render operations as batch

30

target_metadata (MetaData): Target metadata for comparison

31

include_name (callable): Filter for object names

32

include_object (callable): Filter for objects

33

include_schemas (bool): Include schema information

34

process_revision_directives (callable): Custom revision processing

35

compare_type (bool): Enable type comparison

36

compare_server_default (bool): Enable server default comparison

37

render_item (callable): Custom rendering function

38

literal_binds (bool): Use literal parameter binds

39

upgrade_token (str): Token for upgrade operations

40

downgrade_token (str): Token for downgrade operations

41

alembic_module_prefix (str): Prefix for alembic imports

42

sqlalchemy_module_prefix (str): Prefix for SQLAlchemy imports

43

user_module_prefix (str): Prefix for user imports

44

**kw: Additional configuration options

45

"""

46

```

47

48

**Usage Example**:

49

```python

50

# In env.py

51

from alembic import context

52

from sqlalchemy import engine_from_config, pool

53

54

config = context.config

55

target_metadata = myapp.db.metadata

56

57

def run_migrations_online():

58

connectable = engine_from_config(

59

config.get_section(config.config_ini_section),

60

prefix='sqlalchemy.',

61

poolclass=pool.NullPool,

62

)

63

64

with connectable.connect() as connection:

65

context.configure(

66

connection=connection,

67

target_metadata=target_metadata,

68

compare_type=True,

69

compare_server_default=True

70

)

71

72

with context.begin_transaction():

73

context.run_migrations()

74

```

75

76

### Database Connection Access

77

78

Access the current database connection and related objects.

79

80

```python { .api }

81

def get_bind():

82

"""

83

Get the current database connection.

84

85

Returns:

86

Connection: SQLAlchemy connection object bound to current context

87

"""

88

89

def get_context():

90

"""

91

Get the current migration context.

92

93

Returns:

94

MigrationContext: Current MigrationContext instance

95

"""

96

```

97

98

**Usage Examples**:

99

```python

100

# Get connection for custom operations

101

conn = context.get_bind()

102

result = conn.execute("SELECT COUNT(*) FROM users")

103

104

# Get migration context for advanced operations

105

ctx = context.get_context()

106

current_rev = ctx.get_current_revision()

107

```

108

109

### Transaction Management

110

111

Manage transactions during migration execution.

112

113

```python { .api }

114

def begin_transaction():

115

"""

116

Begin a transaction context for migrations.

117

118

Returns:

119

Context manager for transaction scope

120

"""

121

```

122

123

**Usage Example**:

124

```python

125

# Explicit transaction management

126

with context.begin_transaction():

127

context.run_migrations()

128

```

129

130

### Migration Execution

131

132

Execute migration operations and manage the migration process.

133

134

```python { .api }

135

def run_migrations(**kw):

136

"""

137

Run the migration operations.

138

139

Args:

140

**kw: Additional keyword arguments passed to migration functions

141

"""

142

143

def execute(sql, execution_options=None):

144

"""

145

Execute SQL within the migration context.

146

147

Args:

148

sql (str|ClauseElement): SQL statement to execute

149

execution_options (dict): Options for execution

150

151

Returns:

152

Result of SQL execution

153

"""

154

```

155

156

**Usage Examples**:

157

```python

158

# Run migrations with custom arguments

159

context.run_migrations(custom_arg="value")

160

161

# Execute custom SQL

162

context.execute("CREATE EXTENSION IF NOT EXISTS uuid-ossp")

163

```

164

165

### Revision Information

166

167

Access information about revisions and migration state.

168

169

```python { .api }

170

def get_revision_argument():

171

"""

172

Get the target revision argument.

173

174

Returns:

175

str: Target revision identifier

176

"""

177

178

def get_starting_revision_argument():

179

"""

180

Get the starting revision argument.

181

182

Returns:

183

Union[str, Tuple[str, ...], None]: Starting revision identifier(s)

184

"""

185

186

def get_head_revision():

187

"""

188

Get the head revision from the script directory.

189

190

Returns:

191

Union[str, Tuple[str, ...], None]: Head revision identifier(s)

192

"""

193

194

def get_head_revisions():

195

"""

196

Get all head revisions from the script directory.

197

198

Returns:

199

Union[str, Tuple[str, ...], None]: All head revision identifiers

200

"""

201

202

def get_tag_argument():

203

"""

204

Get the --tag argument value.

205

206

Returns:

207

Optional[str]: Tag argument value

208

"""

209

210

def get_x_argument(as_dictionary=False):

211

"""

212

Get the -x argument values.

213

214

Args:

215

as_dictionary (bool): Return as dictionary instead of list

216

217

Returns:

218

Union[List[str], Dict[str, str]]: -x argument values

219

"""

220

"""

221

Get the starting revision argument.

222

223

Returns:

224

str: Starting revision identifier

225

"""

226

227

def get_head_revision():

228

"""

229

Get the current head revision.

230

231

Returns:

232

str: Head revision identifier

233

"""

234

235

def get_head_revisions():

236

"""

237

Get all current head revisions.

238

239

Returns:

240

tuple: All head revision identifiers

241

"""

242

243

def get_tag_argument():

244

"""

245

Get the tag argument for the migration.

246

247

Returns:

248

str|None: Tag value if specified

249

"""

250

```

251

252

**Usage Examples**:

253

```python

254

# Check target revision

255

target = context.get_revision_argument()

256

if target == 'head':

257

print("Migrating to latest revision")

258

259

# Get current heads

260

heads = context.get_head_revisions()

261

print(f"Current heads: {heads}")

262

```

263

264

### Command Line Arguments

265

266

Access command-line arguments and custom parameters.

267

268

```python { .api }

269

def get_x_argument(**kw):

270

"""

271

Get -x arguments from command line.

272

273

Args:

274

**kw: Default values for arguments

275

276

Returns:

277

dict: Dictionary of -x arguments

278

"""

279

```

280

281

**Usage Example**:

282

```python

283

# Access custom command line arguments

284

x_args = context.get_x_argument(dbname="default")

285

database_name = x_args.get('dbname')

286

287

# Usage: alembic -x dbname=testdb upgrade head

288

```

289

290

### Mode Detection

291

292

Determine the current migration execution mode.

293

294

```python { .api }

295

def is_offline_mode():

296

"""

297

Check if running in offline mode.

298

299

Returns:

300

bool: True if in offline/SQL generation mode

301

"""

302

303

def is_transactional_ddl():

304

"""

305

Check if DDL operations are transactional.

306

307

Returns:

308

bool: True if DDL operations support transactions

309

"""

310

```

311

312

**Usage Examples**:

313

```python

314

# Different behavior for offline mode

315

if context.is_offline_mode():

316

context.static_output("-- Offline mode migration")

317

else:

318

context.execute("SELECT 1")

319

320

# Check DDL transaction support

321

if context.is_transactional_ddl():

322

# Safe to use transactions around DDL

323

pass

324

```

325

326

### Output Management

327

328

Manage output for offline/SQL generation mode.

329

330

```python { .api }

331

def static_output(text):

332

"""

333

Emit text output in offline mode.

334

335

Args:

336

text (str): Text to output

337

"""

338

```

339

340

**Usage Example**:

341

```python

342

# Output comments in offline mode

343

if context.is_offline_mode():

344

context.static_output("-- Custom migration step")

345

context.static_output("CREATE INDEX CONCURRENTLY idx_user_email ON users(email);")

346

```

347

348

## Common Patterns

349

350

### Standard env.py Structure

351

352

```python

353

from alembic import context

354

from sqlalchemy import engine_from_config, pool

355

from logging.config import fileConfig

356

357

# Alembic Config object

358

config = context.config

359

360

# Setup logging

361

if config.config_file_name is not None:

362

fileConfig(config.config_file_name)

363

364

# Target metadata from your application

365

from myapp import db

366

target_metadata = db.metadata

367

368

def run_migrations_offline():

369

"""Run migrations in 'offline' mode."""

370

url = config.get_main_option("sqlalchemy.url")

371

context.configure(

372

url=url,

373

target_metadata=target_metadata,

374

literal_binds=True,

375

dialect_opts={"paramstyle": "named"},

376

)

377

378

with context.begin_transaction():

379

context.run_migrations()

380

381

def run_migrations_online():

382

"""Run migrations in 'online' mode."""

383

connectable = engine_from_config(

384

config.get_section(config.config_ini_section),

385

prefix="sqlalchemy.",

386

poolclass=pool.NullPool,

387

)

388

389

with connectable.connect() as connection:

390

context.configure(

391

connection=connection,

392

target_metadata=target_metadata

393

)

394

395

with context.begin_transaction():

396

context.run_migrations()

397

398

if context.is_offline_mode():

399

run_migrations_offline()

400

else:

401

run_migrations_online()

402

```

403

404

### Custom Migration Logic

405

406

```python

407

def run_migrations_online():

408

connectable = engine_from_config(

409

config.get_section(config.config_ini_section),

410

prefix="sqlalchemy.",

411

poolclass=pool.NullPool,

412

)

413

414

with connectable.connect() as connection:

415

# Custom configuration

416

context.configure(

417

connection=connection,

418

target_metadata=target_metadata,

419

compare_type=True,

420

compare_server_default=True,

421

include_object=include_object,

422

include_schemas=True,

423

process_revision_directives=process_revision_directives

424

)

425

426

with context.begin_transaction():

427

# Custom pre-migration logic

428

if context.get_revision_argument() == 'head':

429

context.execute("SET lock_timeout = '2s'")

430

431

context.run_migrations()

432

433

# Custom post-migration logic

434

context.execute("ANALYZE")

435

436

def include_object(object, name, type_, reflected, compare_to):

437

"""Filter objects during autogenerate."""

438

if type_ == "table" and name.startswith("temp_"):

439

return False

440

return True

441

442

def process_revision_directives(context, revision, directives):

443

"""Custom revision processing."""

444

if config.cmd_opts and config.cmd_opts.autogenerate:

445

script = directives[0]

446

if script.upgrade_ops.is_empty():

447

directives[:] = []

448

```

449

450

### Multi-Database Support

451

452

```python

453

def run_migrations_online():

454

engines = {

455

'primary': engine_from_config(

456

config.get_section('primary'),

457

prefix='sqlalchemy.'

458

),

459

'secondary': engine_from_config(

460

config.get_section('secondary'),

461

prefix='sqlalchemy.'

462

)

463

}

464

465

for name, engine in engines.items():

466

with engine.connect() as connection:

467

context.configure(

468

connection=connection,

469

target_metadata=get_metadata_for_db(name),

470

upgrade_token=f"{name}_upgrade",

471

downgrade_token=f"{name}_downgrade"

472

)

473

474

with context.begin_transaction():

475

context.run_migrations()

476

```

477

478

## Error Handling

479

480

Context functions may raise:

481

- `EnvironmentError`: Configuration or setup errors

482

- `CommandError`: Migration command errors

483

- Database-specific exceptions during execution

484

485

## Types

486

487

```python { .api }

488

# Context objects

489

class MigrationContext:

490

bind: Connection

491

dialect: Dialect

492

script: ScriptDirectory

493

opts: Dict[str, Any]

494

495

def get_current_revision(self): ...

496

def stamp(self, script_directory, revision): ...

497

498

class EnvironmentContext:

499

config: Config

500

script: ScriptDirectory

501

502

def configure(self, **kw): ...

503

def run_migrations(self, **kw): ...

504

```