Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon RDS (Relational Database Service) integration for managed database instance lifecycle management. Provides operations for creating, managing, and monitoring RDS instances including PostgreSQL, MySQL, MariaDB, Oracle, and SQL Server databases.
Core RDS client providing database instance management and monitoring functionality.
class RdsHook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
"""
Initialize RDS Hook.
Parameters:
- aws_conn_id: AWS connection ID
- region_name: AWS region name
"""
def create_db_instance(self, db_instance_identifier: str, db_instance_class: str, engine: str, master_username: str = None, master_user_password: str = None, allocated_storage: int = None, **kwargs) -> dict:
"""
Create RDS database instance.
Parameters:
- db_instance_identifier: Unique identifier for DB instance
- db_instance_class: Instance class (e.g., 'db.t3.micro')
- engine: Database engine ('postgres', 'mysql', 'mariadb', 'oracle-ee', 'sqlserver-ex')
- master_username: Master username for the database
- master_user_password: Master password for the database
- allocated_storage: Storage size in GB
Returns:
DB instance configuration
"""
def delete_db_instance(self, db_instance_identifier: str, skip_final_snapshot: bool = True, final_db_snapshot_identifier: str = None, delete_automated_backups: bool = True) -> dict:
"""
Delete RDS database instance.
Parameters:
- db_instance_identifier: DB instance identifier
- skip_final_snapshot: Skip final snapshot creation
- final_db_snapshot_identifier: Final snapshot identifier
- delete_automated_backups: Delete automated backups
Returns:
Deletion response
"""
def describe_db_instances(self, db_instance_identifier: str = None, filters: list = None) -> dict:
"""
Describe RDS database instances.
Parameters:
- db_instance_identifier: Specific DB instance identifier
- filters: List of filters to apply
Returns:
DB instances description
"""
def start_db_instance(self, db_instance_identifier: str) -> dict:
"""
Start stopped RDS database instance.
Parameters:
- db_instance_identifier: DB instance identifier
Returns:
Start instance response
"""
def stop_db_instance(self, db_instance_identifier: str, db_snapshot_identifier: str = None) -> dict:
"""
Stop running RDS database instance.
Parameters:
- db_instance_identifier: DB instance identifier
- db_snapshot_identifier: Snapshot identifier for backup
Returns:
Stop instance response
"""
def reboot_db_instance(self, db_instance_identifier: str, force_failover: bool = False) -> dict:
"""
Reboot RDS database instance.
Parameters:
- db_instance_identifier: DB instance identifier
- force_failover: Force failover during reboot
Returns:
Reboot response
"""
def create_db_snapshot(self, db_snapshot_identifier: str, db_instance_identifier: str, tags: list = None) -> dict:
"""
Create database snapshot.
Parameters:
- db_snapshot_identifier: Snapshot identifier
- db_instance_identifier: Source DB instance identifier
- tags: Snapshot tags
Returns:
Snapshot configuration
"""
def delete_db_snapshot(self, db_snapshot_identifier: str) -> dict:
"""
Delete database snapshot.
Parameters:
- db_snapshot_identifier: Snapshot identifier
Returns:
Deletion response
"""
def restore_db_instance_from_db_snapshot(self, db_instance_identifier: str, db_snapshot_identifier: str, db_instance_class: str = None, **kwargs) -> dict:
"""
Restore database instance from snapshot.
Parameters:
- db_instance_identifier: New DB instance identifier
- db_snapshot_identifier: Source snapshot identifier
- db_instance_class: Instance class for restored instance
Returns:
Restored instance configuration
"""
def get_db_instance_state(self, db_instance_identifier: str) -> str:
"""
Get RDS instance state.
Parameters:
- db_instance_identifier: DB instance identifier
Returns:
Current instance state
"""Task implementations for RDS database operations.
class RdsCreateDbInstanceOperator(BaseOperator):
def __init__(self, db_instance_identifier: str, db_instance_class: str, engine: str, master_username: str = None, master_user_password: str = None, allocated_storage: int = 20, aws_conn_id: str = 'aws_default', **kwargs):
"""
Create RDS database instance.
Parameters:
- db_instance_identifier: Unique identifier for DB instance
- db_instance_class: Instance class
- engine: Database engine
- master_username: Master username
- master_user_password: Master password
- allocated_storage: Storage size in GB
- aws_conn_id: AWS connection ID
"""
class RdsDeleteDbInstanceOperator(BaseOperator):
def __init__(self, db_instance_identifier: str, skip_final_snapshot: bool = True, final_db_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Delete RDS database instance.
Parameters:
- db_instance_identifier: DB instance identifier
- skip_final_snapshot: Skip final snapshot creation
- final_db_snapshot_identifier: Final snapshot identifier
- aws_conn_id: AWS connection ID
"""
class RdsStartDbOperator(BaseOperator):
def __init__(self, db_instance_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Start RDS database instance.
Parameters:
- db_instance_identifier: DB instance identifier
- aws_conn_id: AWS connection ID
"""
class RdsStopDbOperator(BaseOperator):
def __init__(self, db_instance_identifier: str, db_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Stop RDS database instance.
Parameters:
- db_instance_identifier: DB instance identifier
- db_snapshot_identifier: Snapshot identifier for backup
- aws_conn_id: AWS connection ID
"""
class RdsCreateDbSnapshotOperator(BaseOperator):
def __init__(self, db_snapshot_identifier: str, db_instance_identifier: str, tags: list = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Create RDS database snapshot.
Parameters:
- db_snapshot_identifier: Snapshot identifier
- db_instance_identifier: Source DB instance identifier
- tags: Snapshot tags
- aws_conn_id: AWS connection ID
"""
class RdsDeleteDbSnapshotOperator(BaseOperator):
def __init__(self, db_snapshot_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Delete RDS database snapshot.
Parameters:
- db_snapshot_identifier: Snapshot identifier
- aws_conn_id: AWS connection ID
"""Monitoring tasks for RDS instance states and snapshot availability.
class RdsDbSensor(BaseSensorOperator):
def __init__(self, db_identifier: str, target_statuses: list = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for RDS database instance to reach target state.
Parameters:
- db_identifier: DB instance identifier
- target_statuses: List of target statuses
- aws_conn_id: AWS connection ID
"""
class RdsSnapshotExistenceSensor(BaseSensorOperator):
def __init__(self, db_snapshot_identifier: str, target_statuses: list = ['available'], aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for RDS snapshot to exist and be available.
Parameters:
- db_snapshot_identifier: Snapshot identifier
- target_statuses: List of acceptable statuses
- aws_conn_id: AWS connection ID
"""from airflow import DAG
from airflow.providers.amazon.aws.operators.rds import (
RdsCreateDbInstanceOperator,
RdsCreateDbSnapshotOperator,
RdsStopDbOperator,
RdsStartDbOperator
)
from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
dag = DAG('rds_lifecycle', start_date=datetime(2023, 1, 1))
# Create database instance
create_db = RdsCreateDbInstanceOperator(
task_id='create_database',
db_instance_identifier='analytics-db-prod',
db_instance_class='db.r5.large',
engine='postgres',
master_username='admin',
master_user_password='{{ var.value.db_password }}',
allocated_storage=100,
storage_type='gp2',
vpc_security_group_ids=['sg-12345678'],
db_subnet_group_name='analytics-subnet-group',
backup_retention_period=7,
multi_az=True,
publicly_accessible=False,
tags=[
{'Key': 'Environment', 'Value': 'production'},
{'Key': 'Application', 'Value': 'analytics'}
],
dag=dag
)
# Wait for database to be available
wait_for_db = RdsDbSensor(
task_id='wait_for_database',
db_identifier='analytics-db-prod',
target_statuses=['available'],
timeout=1800, # 30 minutes
dag=dag
)
# Create backup snapshot
create_snapshot = RdsCreateDbSnapshotOperator(
task_id='create_backup',
db_snapshot_identifier='analytics-db-backup-{{ ds }}',
db_instance_identifier='analytics-db-prod',
tags=[
{'Key': 'Date', 'Value': '{{ ds }}'},
{'Key': 'Type', 'Value': 'scheduled-backup'}
],
dag=dag
)
create_db >> wait_for_db >> create_snapshotfrom airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor
# Stop database for maintenance
stop_db = RdsStopDbOperator(
task_id='stop_for_maintenance',
db_instance_identifier='analytics-db-prod',
db_snapshot_identifier='pre-maintenance-{{ ds }}',
dag=dag
)
# Wait for snapshot completion
wait_for_snapshot = RdsSnapshotExistenceSensor(
task_id='wait_for_snapshot',
db_snapshot_identifier='pre-maintenance-{{ ds }}',
timeout=3600, # 1 hour
dag=dag
)
# Restart database after maintenance
start_db = RdsStartDbOperator(
task_id='restart_after_maintenance',
db_instance_identifier='analytics-db-prod',
dag=dag
)
# Wait for database to be available again
wait_for_restart = RdsDbSensor(
task_id='wait_for_restart',
db_identifier='analytics-db-prod',
target_statuses=['available'],
dag=dag
)
stop_db >> wait_for_snapshot >> start_db >> wait_for_restart# RDS instance states
class RdsInstanceState:
AVAILABLE = 'available'
BACKING_UP = 'backing-up'
CREATING = 'creating'
DELETING = 'deleting'
FAILED = 'failed'
MAINTENANCE = 'maintenance'
MODIFYING = 'modifying'
REBOOTING = 'rebooting'
STARTING = 'starting'
STOPPED = 'stopped'
STOPPING = 'stopping'
STORAGE_FULL = 'storage-full'
UPGRADING = 'upgrading'
# RDS engines
class RdsEngine:
POSTGRES = 'postgres'
MYSQL = 'mysql'
MARIADB = 'mariadb'
ORACLE_EE = 'oracle-ee'
ORACLE_SE2 = 'oracle-se2'
ORACLE_SE1 = 'oracle-se1'
ORACLE_SE = 'oracle-se'
SQLSERVER_EE = 'sqlserver-ee'
SQLSERVER_SE = 'sqlserver-se'
SQLSERVER_EX = 'sqlserver-ex'
SQLSERVER_WEB = 'sqlserver-web'
# Instance classes
class RdsInstanceClass:
T3_MICRO = 'db.t3.micro'
T3_SMALL = 'db.t3.small'
T3_MEDIUM = 'db.t3.medium'
T3_LARGE = 'db.t3.large'
M5_LARGE = 'db.m5.large'
M5_XLARGE = 'db.m5.xlarge'
M5_2XLARGE = 'db.m5.2xlarge'
R5_LARGE = 'db.r5.large'
R5_XLARGE = 'db.r5.xlarge'
R5_2XLARGE = 'db.r5.2xlarge'
# Storage types
class StorageType:
STANDARD = 'standard'
GP2 = 'gp2'
GP3 = 'gp3'
IO1 = 'io1'
IO2 = 'io2'
# DB instance configuration
class DbInstanceConfig:
db_instance_identifier: str
db_instance_class: str
engine: str
master_username: str
allocated_storage: int
storage_type: str = 'gp2'
storage_encrypted: bool = False
kms_key_id: str = None
db_name: str = None
port: int = None
vpc_security_group_ids: list = None
db_subnet_group_name: str = None
publicly_accessible: bool = False
multi_az: bool = False
backup_retention_period: int = 1
preferred_backup_window: str = None
preferred_maintenance_window: str = None
auto_minor_version_upgrade: bool = True
license_model: str = None
option_group_name: str = None
character_set_name: str = None
tags: list = None
copy_tags_to_snapshot: bool = False
monitoring_interval: int = 0
monitoring_role_arn: str = None
domain_iam_role_name: str = None
promotion_tier: int = None
timezone: str = None
enable_iam_database_authentication: bool = False
enable_performance_insights: bool = False
performance_insights_kms_key_id: str = None
performance_insights_retention_period: int = None
enable_cloudwatch_logs_exports: list = None
processor_features: list = None
deletion_protection: bool = False
max_allocated_storage: int = NoneInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon