0
# RDS Database Operations
1
2
Amazon RDS (Relational Database Service) integration for managed database instance lifecycle management. Provides operations for creating, managing, and monitoring RDS instances including PostgreSQL, MySQL, MariaDB, Oracle, and SQL Server databases.
3
4
## Capabilities
5
6
### RDS Hook
7
8
Core RDS client providing database instance management and monitoring functionality.
9
10
```python { .api }
11
class RdsHook(AwsBaseHook):
12
def __init__(self, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
13
"""
14
Initialize RDS Hook.
15
16
Parameters:
17
- aws_conn_id: AWS connection ID
18
- region_name: AWS region name
19
"""
20
21
def create_db_instance(self, db_instance_identifier: str, db_instance_class: str, engine: str, master_username: str = None, master_user_password: str = None, allocated_storage: int = None, **kwargs) -> dict:
22
"""
23
Create RDS database instance.
24
25
Parameters:
26
- db_instance_identifier: Unique identifier for DB instance
27
- db_instance_class: Instance class (e.g., 'db.t3.micro')
28
- engine: Database engine ('postgres', 'mysql', 'mariadb', 'oracle-ee', 'sqlserver-ex')
29
- master_username: Master username for the database
30
- master_user_password: Master password for the database
31
- allocated_storage: Storage size in GB
32
33
Returns:
34
DB instance configuration
35
"""
36
37
def delete_db_instance(self, db_instance_identifier: str, skip_final_snapshot: bool = True, final_db_snapshot_identifier: str = None, delete_automated_backups: bool = True) -> dict:
38
"""
39
Delete RDS database instance.
40
41
Parameters:
42
- db_instance_identifier: DB instance identifier
43
- skip_final_snapshot: Skip final snapshot creation
44
- final_db_snapshot_identifier: Final snapshot identifier
45
- delete_automated_backups: Delete automated backups
46
47
Returns:
48
Deletion response
49
"""
50
51
def describe_db_instances(self, db_instance_identifier: str = None, filters: list = None) -> dict:
52
"""
53
Describe RDS database instances.
54
55
Parameters:
56
- db_instance_identifier: Specific DB instance identifier
57
- filters: List of filters to apply
58
59
Returns:
60
DB instances description
61
"""
62
63
def start_db_instance(self, db_instance_identifier: str) -> dict:
64
"""
65
Start stopped RDS database instance.
66
67
Parameters:
68
- db_instance_identifier: DB instance identifier
69
70
Returns:
71
Start instance response
72
"""
73
74
def stop_db_instance(self, db_instance_identifier: str, db_snapshot_identifier: str = None) -> dict:
75
"""
76
Stop running RDS database instance.
77
78
Parameters:
79
- db_instance_identifier: DB instance identifier
80
- db_snapshot_identifier: Snapshot identifier for backup
81
82
Returns:
83
Stop instance response
84
"""
85
86
def reboot_db_instance(self, db_instance_identifier: str, force_failover: bool = False) -> dict:
87
"""
88
Reboot RDS database instance.
89
90
Parameters:
91
- db_instance_identifier: DB instance identifier
92
- force_failover: Force failover during reboot
93
94
Returns:
95
Reboot response
96
"""
97
98
def create_db_snapshot(self, db_snapshot_identifier: str, db_instance_identifier: str, tags: list = None) -> dict:
99
"""
100
Create database snapshot.
101
102
Parameters:
103
- db_snapshot_identifier: Snapshot identifier
104
- db_instance_identifier: Source DB instance identifier
105
- tags: Snapshot tags
106
107
Returns:
108
Snapshot configuration
109
"""
110
111
def delete_db_snapshot(self, db_snapshot_identifier: str) -> dict:
112
"""
113
Delete database snapshot.
114
115
Parameters:
116
- db_snapshot_identifier: Snapshot identifier
117
118
Returns:
119
Deletion response
120
"""
121
122
def restore_db_instance_from_db_snapshot(self, db_instance_identifier: str, db_snapshot_identifier: str, db_instance_class: str = None, **kwargs) -> dict:
123
"""
124
Restore database instance from snapshot.
125
126
Parameters:
127
- db_instance_identifier: New DB instance identifier
128
- db_snapshot_identifier: Source snapshot identifier
129
- db_instance_class: Instance class for restored instance
130
131
Returns:
132
Restored instance configuration
133
"""
134
135
def get_db_instance_state(self, db_instance_identifier: str) -> str:
136
"""
137
Get RDS instance state.
138
139
Parameters:
140
- db_instance_identifier: DB instance identifier
141
142
Returns:
143
Current instance state
144
"""
145
```
146
147
### RDS Operators
148
149
Task implementations for RDS database operations.
150
151
```python { .api }
152
class RdsCreateDbInstanceOperator(BaseOperator):
153
def __init__(self, db_instance_identifier: str, db_instance_class: str, engine: str, master_username: str = None, master_user_password: str = None, allocated_storage: int = 20, aws_conn_id: str = 'aws_default', **kwargs):
154
"""
155
Create RDS database instance.
156
157
Parameters:
158
- db_instance_identifier: Unique identifier for DB instance
159
- db_instance_class: Instance class
160
- engine: Database engine
161
- master_username: Master username
162
- master_user_password: Master password
163
- allocated_storage: Storage size in GB
164
- aws_conn_id: AWS connection ID
165
"""
166
167
class RdsDeleteDbInstanceOperator(BaseOperator):
168
def __init__(self, db_instance_identifier: str, skip_final_snapshot: bool = True, final_db_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
169
"""
170
Delete RDS database instance.
171
172
Parameters:
173
- db_instance_identifier: DB instance identifier
174
- skip_final_snapshot: Skip final snapshot creation
175
- final_db_snapshot_identifier: Final snapshot identifier
176
- aws_conn_id: AWS connection ID
177
"""
178
179
class RdsStartDbOperator(BaseOperator):
180
def __init__(self, db_instance_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
181
"""
182
Start RDS database instance.
183
184
Parameters:
185
- db_instance_identifier: DB instance identifier
186
- aws_conn_id: AWS connection ID
187
"""
188
189
class RdsStopDbOperator(BaseOperator):
190
def __init__(self, db_instance_identifier: str, db_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
191
"""
192
Stop RDS database instance.
193
194
Parameters:
195
- db_instance_identifier: DB instance identifier
196
- db_snapshot_identifier: Snapshot identifier for backup
197
- aws_conn_id: AWS connection ID
198
"""
199
200
class RdsCreateDbSnapshotOperator(BaseOperator):
201
def __init__(self, db_snapshot_identifier: str, db_instance_identifier: str, tags: list = None, aws_conn_id: str = 'aws_default', **kwargs):
202
"""
203
Create RDS database snapshot.
204
205
Parameters:
206
- db_snapshot_identifier: Snapshot identifier
207
- db_instance_identifier: Source DB instance identifier
208
- tags: Snapshot tags
209
- aws_conn_id: AWS connection ID
210
"""
211
212
class RdsDeleteDbSnapshotOperator(BaseOperator):
213
def __init__(self, db_snapshot_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
214
"""
215
Delete RDS database snapshot.
216
217
Parameters:
218
- db_snapshot_identifier: Snapshot identifier
219
- aws_conn_id: AWS connection ID
220
"""
221
```
222
223
### RDS Sensors
224
225
Monitoring tasks for RDS instance states and snapshot availability.
226
227
```python { .api }
228
class RdsDbSensor(BaseSensorOperator):
229
def __init__(self, db_identifier: str, target_statuses: list = None, aws_conn_id: str = 'aws_default', **kwargs):
230
"""
231
Wait for RDS database instance to reach target state.
232
233
Parameters:
234
- db_identifier: DB instance identifier
235
- target_statuses: List of target statuses
236
- aws_conn_id: AWS connection ID
237
"""
238
239
class RdsSnapshotExistenceSensor(BaseSensorOperator):
240
def __init__(self, db_snapshot_identifier: str, target_statuses: list = ['available'], aws_conn_id: str = 'aws_default', **kwargs):
241
"""
242
Wait for RDS snapshot to exist and be available.
243
244
Parameters:
245
- db_snapshot_identifier: Snapshot identifier
246
- target_statuses: List of acceptable statuses
247
- aws_conn_id: AWS connection ID
248
"""
249
```
250
251
## Usage Examples
252
253
### Database Instance Lifecycle Management
254
255
```python
256
from airflow import DAG
257
from airflow.providers.amazon.aws.operators.rds import (
258
RdsCreateDbInstanceOperator,
259
RdsCreateDbSnapshotOperator,
260
RdsStopDbOperator,
261
RdsStartDbOperator
262
)
263
from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
264
265
dag = DAG('rds_lifecycle', start_date=datetime(2023, 1, 1))
266
267
# Create database instance
268
create_db = RdsCreateDbInstanceOperator(
269
task_id='create_database',
270
db_instance_identifier='analytics-db-prod',
271
db_instance_class='db.r5.large',
272
engine='postgres',
273
master_username='admin',
274
master_user_password='{{ var.value.db_password }}',
275
allocated_storage=100,
276
storage_type='gp2',
277
vpc_security_group_ids=['sg-12345678'],
278
db_subnet_group_name='analytics-subnet-group',
279
backup_retention_period=7,
280
multi_az=True,
281
publicly_accessible=False,
282
tags=[
283
{'Key': 'Environment', 'Value': 'production'},
284
{'Key': 'Application', 'Value': 'analytics'}
285
],
286
dag=dag
287
)
288
289
# Wait for database to be available
290
wait_for_db = RdsDbSensor(
291
task_id='wait_for_database',
292
db_identifier='analytics-db-prod',
293
target_statuses=['available'],
294
timeout=1800, # 30 minutes
295
dag=dag
296
)
297
298
# Create backup snapshot
299
create_snapshot = RdsCreateDbSnapshotOperator(
300
task_id='create_backup',
301
db_snapshot_identifier='analytics-db-backup-{{ ds }}',
302
db_instance_identifier='analytics-db-prod',
303
tags=[
304
{'Key': 'Date', 'Value': '{{ ds }}'},
305
{'Key': 'Type', 'Value': 'scheduled-backup'}
306
],
307
dag=dag
308
)
309
310
create_db >> wait_for_db >> create_snapshot
311
```
312
313
### Database Maintenance Window
314
315
```python
316
from airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor
317
318
# Stop database for maintenance
319
stop_db = RdsStopDbOperator(
320
task_id='stop_for_maintenance',
321
db_instance_identifier='analytics-db-prod',
322
db_snapshot_identifier='pre-maintenance-{{ ds }}',
323
dag=dag
324
)
325
326
# Wait for snapshot completion
327
wait_for_snapshot = RdsSnapshotExistenceSensor(
328
task_id='wait_for_snapshot',
329
db_snapshot_identifier='pre-maintenance-{{ ds }}',
330
timeout=3600, # 1 hour
331
dag=dag
332
)
333
334
# Restart database after maintenance
335
start_db = RdsStartDbOperator(
336
task_id='restart_after_maintenance',
337
db_instance_identifier='analytics-db-prod',
338
dag=dag
339
)
340
341
# Wait for database to be available again
342
wait_for_restart = RdsDbSensor(
343
task_id='wait_for_restart',
344
db_identifier='analytics-db-prod',
345
target_statuses=['available'],
346
dag=dag
347
)
348
349
stop_db >> wait_for_snapshot >> start_db >> wait_for_restart
350
```
351
352
## Types
353
354
```python { .api }
355
# RDS instance states
356
class RdsInstanceState:
357
AVAILABLE = 'available'
358
BACKING_UP = 'backing-up'
359
CREATING = 'creating'
360
DELETING = 'deleting'
361
FAILED = 'failed'
362
MAINTENANCE = 'maintenance'
363
MODIFYING = 'modifying'
364
REBOOTING = 'rebooting'
365
STARTING = 'starting'
366
STOPPED = 'stopped'
367
STOPPING = 'stopping'
368
STORAGE_FULL = 'storage-full'
369
UPGRADING = 'upgrading'
370
371
# RDS engines
372
class RdsEngine:
373
POSTGRES = 'postgres'
374
MYSQL = 'mysql'
375
MARIADB = 'mariadb'
376
ORACLE_EE = 'oracle-ee'
377
ORACLE_SE2 = 'oracle-se2'
378
ORACLE_SE1 = 'oracle-se1'
379
ORACLE_SE = 'oracle-se'
380
SQLSERVER_EE = 'sqlserver-ee'
381
SQLSERVER_SE = 'sqlserver-se'
382
SQLSERVER_EX = 'sqlserver-ex'
383
SQLSERVER_WEB = 'sqlserver-web'
384
385
# Instance classes
386
class RdsInstanceClass:
387
T3_MICRO = 'db.t3.micro'
388
T3_SMALL = 'db.t3.small'
389
T3_MEDIUM = 'db.t3.medium'
390
T3_LARGE = 'db.t3.large'
391
M5_LARGE = 'db.m5.large'
392
M5_XLARGE = 'db.m5.xlarge'
393
M5_2XLARGE = 'db.m5.2xlarge'
394
R5_LARGE = 'db.r5.large'
395
R5_XLARGE = 'db.r5.xlarge'
396
R5_2XLARGE = 'db.r5.2xlarge'
397
398
# Storage types
399
class StorageType:
400
STANDARD = 'standard'
401
GP2 = 'gp2'
402
GP3 = 'gp3'
403
IO1 = 'io1'
404
IO2 = 'io2'
405
406
# DB instance configuration
407
class DbInstanceConfig:
408
db_instance_identifier: str
409
db_instance_class: str
410
engine: str
411
master_username: str
412
allocated_storage: int
413
storage_type: str = 'gp2'
414
storage_encrypted: bool = False
415
kms_key_id: str = None
416
db_name: str = None
417
port: int = None
418
vpc_security_group_ids: list = None
419
db_subnet_group_name: str = None
420
publicly_accessible: bool = False
421
multi_az: bool = False
422
backup_retention_period: int = 1
423
preferred_backup_window: str = None
424
preferred_maintenance_window: str = None
425
auto_minor_version_upgrade: bool = True
426
license_model: str = None
427
option_group_name: str = None
428
character_set_name: str = None
429
tags: list = None
430
copy_tags_to_snapshot: bool = False
431
monitoring_interval: int = 0
432
monitoring_role_arn: str = None
433
domain_iam_role_name: str = None
434
promotion_tier: int = None
435
timezone: str = None
436
enable_iam_database_authentication: bool = False
437
enable_performance_insights: bool = False
438
performance_insights_kms_key_id: str = None
439
performance_insights_retention_period: int = None
440
enable_cloudwatch_logs_exports: list = None
441
processor_features: list = None
442
deletion_protection: bool = False
443
max_allocated_storage: int = None
444
```