CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

rds-databases.mddocs/

RDS Database Operations

Amazon RDS (Relational Database Service) integration for managed database instance lifecycle management. Provides operations for creating, managing, and monitoring RDS instances including PostgreSQL, MySQL, MariaDB, Oracle, and SQL Server databases.

Capabilities

RDS Hook

Core RDS client providing database instance management and monitoring functionality.

class RdsHook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
        """
        Initialize RDS Hook.
        
        Parameters:
        - aws_conn_id: AWS connection ID
        - region_name: AWS region name
        """

    def create_db_instance(self, db_instance_identifier: str, db_instance_class: str, engine: str, master_username: str = None, master_user_password: str = None, allocated_storage: int = None, **kwargs) -> dict:
        """
        Create RDS database instance.
        
        Parameters:
        - db_instance_identifier: Unique identifier for DB instance
        - db_instance_class: Instance class (e.g., 'db.t3.micro')
        - engine: Database engine ('postgres', 'mysql', 'mariadb', 'oracle-ee', 'sqlserver-ex')
        - master_username: Master username for the database
        - master_user_password: Master password for the database
        - allocated_storage: Storage size in GB
        
        Returns:
        DB instance configuration
        """

    def delete_db_instance(self, db_instance_identifier: str, skip_final_snapshot: bool = True, final_db_snapshot_identifier: str = None, delete_automated_backups: bool = True) -> dict:
        """
        Delete RDS database instance.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        - skip_final_snapshot: Skip final snapshot creation
        - final_db_snapshot_identifier: Final snapshot identifier
        - delete_automated_backups: Delete automated backups
        
        Returns:
        Deletion response
        """

    def describe_db_instances(self, db_instance_identifier: str = None, filters: list = None) -> dict:
        """
        Describe RDS database instances.
        
        Parameters:
        - db_instance_identifier: Specific DB instance identifier
        - filters: List of filters to apply
        
        Returns:
        DB instances description
        """

    def start_db_instance(self, db_instance_identifier: str) -> dict:
        """
        Start stopped RDS database instance.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        
        Returns:
        Start instance response
        """

    def stop_db_instance(self, db_instance_identifier: str, db_snapshot_identifier: str = None) -> dict:
        """
        Stop running RDS database instance.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        - db_snapshot_identifier: Snapshot identifier for backup
        
        Returns:
        Stop instance response
        """

    def reboot_db_instance(self, db_instance_identifier: str, force_failover: bool = False) -> dict:
        """
        Reboot RDS database instance.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        - force_failover: Force failover during reboot
        
        Returns:
        Reboot response
        """

    def create_db_snapshot(self, db_snapshot_identifier: str, db_instance_identifier: str, tags: list = None) -> dict:
        """
        Create database snapshot.
        
        Parameters:
        - db_snapshot_identifier: Snapshot identifier
        - db_instance_identifier: Source DB instance identifier
        - tags: Snapshot tags
        
        Returns:
        Snapshot configuration
        """

    def delete_db_snapshot(self, db_snapshot_identifier: str) -> dict:
        """
        Delete database snapshot.
        
        Parameters:
        - db_snapshot_identifier: Snapshot identifier
        
        Returns:
        Deletion response
        """

    def restore_db_instance_from_db_snapshot(self, db_instance_identifier: str, db_snapshot_identifier: str, db_instance_class: str = None, **kwargs) -> dict:
        """
        Restore database instance from snapshot.
        
        Parameters:
        - db_instance_identifier: New DB instance identifier
        - db_snapshot_identifier: Source snapshot identifier
        - db_instance_class: Instance class for restored instance
        
        Returns:
        Restored instance configuration
        """

    def get_db_instance_state(self, db_instance_identifier: str) -> str:
        """
        Get RDS instance state.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        
        Returns:
        Current instance state
        """

RDS Operators

Task implementations for RDS database operations.

class RdsCreateDbInstanceOperator(BaseOperator):
    def __init__(self, db_instance_identifier: str, db_instance_class: str, engine: str, master_username: str = None, master_user_password: str = None, allocated_storage: int = 20, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Create RDS database instance.
        
        Parameters:
        - db_instance_identifier: Unique identifier for DB instance
        - db_instance_class: Instance class
        - engine: Database engine
        - master_username: Master username
        - master_user_password: Master password
        - allocated_storage: Storage size in GB
        - aws_conn_id: AWS connection ID
        """

class RdsDeleteDbInstanceOperator(BaseOperator):
    def __init__(self, db_instance_identifier: str, skip_final_snapshot: bool = True, final_db_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Delete RDS database instance.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        - skip_final_snapshot: Skip final snapshot creation
        - final_db_snapshot_identifier: Final snapshot identifier
        - aws_conn_id: AWS connection ID
        """

class RdsStartDbOperator(BaseOperator):
    def __init__(self, db_instance_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Start RDS database instance.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        - aws_conn_id: AWS connection ID
        """

class RdsStopDbOperator(BaseOperator):
    def __init__(self, db_instance_identifier: str, db_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Stop RDS database instance.
        
        Parameters:
        - db_instance_identifier: DB instance identifier
        - db_snapshot_identifier: Snapshot identifier for backup
        - aws_conn_id: AWS connection ID
        """

class RdsCreateDbSnapshotOperator(BaseOperator):
    def __init__(self, db_snapshot_identifier: str, db_instance_identifier: str, tags: list = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Create RDS database snapshot.
        
        Parameters:
        - db_snapshot_identifier: Snapshot identifier
        - db_instance_identifier: Source DB instance identifier
        - tags: Snapshot tags
        - aws_conn_id: AWS connection ID
        """

class RdsDeleteDbSnapshotOperator(BaseOperator):
    def __init__(self, db_snapshot_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Delete RDS database snapshot.
        
        Parameters:
        - db_snapshot_identifier: Snapshot identifier
        - aws_conn_id: AWS connection ID
        """

RDS Sensors

Monitoring tasks for RDS instance states and snapshot availability.

class RdsDbSensor(BaseSensorOperator):
    def __init__(self, db_identifier: str, target_statuses: list = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for RDS database instance to reach target state.
        
        Parameters:
        - db_identifier: DB instance identifier
        - target_statuses: List of target statuses
        - aws_conn_id: AWS connection ID
        """

class RdsSnapshotExistenceSensor(BaseSensorOperator):
    def __init__(self, db_snapshot_identifier: str, target_statuses: list = ['available'], aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for RDS snapshot to exist and be available.
        
        Parameters:
        - db_snapshot_identifier: Snapshot identifier
        - target_statuses: List of acceptable statuses
        - aws_conn_id: AWS connection ID
        """

Usage Examples

Database Instance Lifecycle Management

from airflow import DAG
from airflow.providers.amazon.aws.operators.rds import (
    RdsCreateDbInstanceOperator,
    RdsCreateDbSnapshotOperator,
    RdsStopDbOperator,
    RdsStartDbOperator
)
from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor

dag = DAG('rds_lifecycle', start_date=datetime(2023, 1, 1))

# Create database instance
create_db = RdsCreateDbInstanceOperator(
    task_id='create_database',
    db_instance_identifier='analytics-db-prod',
    db_instance_class='db.r5.large',
    engine='postgres',
    master_username='admin',
    master_user_password='{{ var.value.db_password }}',
    allocated_storage=100,
    storage_type='gp2',
    vpc_security_group_ids=['sg-12345678'],
    db_subnet_group_name='analytics-subnet-group',
    backup_retention_period=7,
    multi_az=True,
    publicly_accessible=False,
    tags=[
        {'Key': 'Environment', 'Value': 'production'},
        {'Key': 'Application', 'Value': 'analytics'}
    ],
    dag=dag
)

# Wait for database to be available
wait_for_db = RdsDbSensor(
    task_id='wait_for_database',
    db_identifier='analytics-db-prod',
    target_statuses=['available'],
    timeout=1800,  # 30 minutes
    dag=dag
)

# Create backup snapshot
create_snapshot = RdsCreateDbSnapshotOperator(
    task_id='create_backup',
    db_snapshot_identifier='analytics-db-backup-{{ ds }}',
    db_instance_identifier='analytics-db-prod',
    tags=[
        {'Key': 'Date', 'Value': '{{ ds }}'},
        {'Key': 'Type', 'Value': 'scheduled-backup'}
    ],
    dag=dag
)

create_db >> wait_for_db >> create_snapshot

Database Maintenance Window

from airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor

# Stop database for maintenance
stop_db = RdsStopDbOperator(
    task_id='stop_for_maintenance',
    db_instance_identifier='analytics-db-prod',
    db_snapshot_identifier='pre-maintenance-{{ ds }}',
    dag=dag
)

# Wait for snapshot completion
wait_for_snapshot = RdsSnapshotExistenceSensor(
    task_id='wait_for_snapshot',
    db_snapshot_identifier='pre-maintenance-{{ ds }}',
    timeout=3600,  # 1 hour
    dag=dag
)

# Restart database after maintenance
start_db = RdsStartDbOperator(
    task_id='restart_after_maintenance',
    db_instance_identifier='analytics-db-prod',
    dag=dag
)

# Wait for database to be available again
wait_for_restart = RdsDbSensor(
    task_id='wait_for_restart',
    db_identifier='analytics-db-prod',
    target_statuses=['available'],
    dag=dag
)

stop_db >> wait_for_snapshot >> start_db >> wait_for_restart

Types

# RDS instance states
class RdsInstanceState:
    AVAILABLE = 'available'
    BACKING_UP = 'backing-up'
    CREATING = 'creating'
    DELETING = 'deleting'
    FAILED = 'failed'
    MAINTENANCE = 'maintenance'
    MODIFYING = 'modifying'
    REBOOTING = 'rebooting'
    STARTING = 'starting'
    STOPPED = 'stopped'
    STOPPING = 'stopping'
    STORAGE_FULL = 'storage-full'
    UPGRADING = 'upgrading'

# RDS engines
class RdsEngine:
    POSTGRES = 'postgres'
    MYSQL = 'mysql'
    MARIADB = 'mariadb'
    ORACLE_EE = 'oracle-ee'
    ORACLE_SE2 = 'oracle-se2'
    ORACLE_SE1 = 'oracle-se1'
    ORACLE_SE = 'oracle-se'
    SQLSERVER_EE = 'sqlserver-ee'
    SQLSERVER_SE = 'sqlserver-se'
    SQLSERVER_EX = 'sqlserver-ex'
    SQLSERVER_WEB = 'sqlserver-web'

# Instance classes
class RdsInstanceClass:
    T3_MICRO = 'db.t3.micro'
    T3_SMALL = 'db.t3.small'
    T3_MEDIUM = 'db.t3.medium'
    T3_LARGE = 'db.t3.large'
    M5_LARGE = 'db.m5.large'
    M5_XLARGE = 'db.m5.xlarge'
    M5_2XLARGE = 'db.m5.2xlarge'
    R5_LARGE = 'db.r5.large'
    R5_XLARGE = 'db.r5.xlarge'
    R5_2XLARGE = 'db.r5.2xlarge'

# Storage types
class StorageType:
    STANDARD = 'standard'
    GP2 = 'gp2'
    GP3 = 'gp3'
    IO1 = 'io1'
    IO2 = 'io2'

# DB instance configuration
class DbInstanceConfig:
    db_instance_identifier: str
    db_instance_class: str
    engine: str
    master_username: str
    allocated_storage: int
    storage_type: str = 'gp2'
    storage_encrypted: bool = False
    kms_key_id: str = None
    db_name: str = None
    port: int = None
    vpc_security_group_ids: list = None
    db_subnet_group_name: str = None
    publicly_accessible: bool = False
    multi_az: bool = False
    backup_retention_period: int = 1
    preferred_backup_window: str = None
    preferred_maintenance_window: str = None
    auto_minor_version_upgrade: bool = True
    license_model: str = None
    option_group_name: str = None
    character_set_name: str = None
    tags: list = None
    copy_tags_to_snapshot: bool = False
    monitoring_interval: int = 0
    monitoring_role_arn: str = None
    domain_iam_role_name: str = None
    promotion_tier: int = None
    timezone: str = None
    enable_iam_database_authentication: bool = False
    enable_performance_insights: bool = False
    performance_insights_kms_key_id: str = None
    performance_insights_retention_period: int = None
    enable_cloudwatch_logs_exports: list = None
    processor_features: list = None
    deletion_protection: bool = False
    max_allocated_storage: int = None

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json