CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

dms-migration.mddocs/

AWS Database Migration Service (DMS)

AWS Database Migration Service provides comprehensive database migration and replication capabilities, enabling seamless data transfer between different database engines and continuous data replication for data pipelines.

Capabilities

Replication Task Management

Create and manage DMS replication tasks for database migration and continuous data replication.

class DmsCreateTaskOperator(AwsBaseOperator):
    """
    Create a DMS replication task.
    
    Parameters:
    - replication_task_id: str - unique identifier for the replication task
    - source_endpoint_arn: str - ARN of the source endpoint
    - target_endpoint_arn: str - ARN of the target endpoint
    - replication_instance_arn: str - ARN of the replication instance
    - migration_type: str - migration type ('full-load', 'cdc', 'full-load-and-cdc')
    - table_mappings: str - JSON string defining table mapping rules
    - replication_task_settings: str - JSON string with task settings
    - create_task_kwargs: dict - additional task creation parameters
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Replication task ARN
    """
    def __init__(
        self,
        replication_task_id: str,
        source_endpoint_arn: str,
        target_endpoint_arn: str,
        replication_instance_arn: str,
        migration_type: str,
        table_mappings: str,
        replication_task_settings: str = None,
        create_task_kwargs: dict = None,
        **kwargs
    ): ...
class DmsStartTaskOperator(AwsBaseOperator):
    """
    Start a DMS replication task.
    
    Parameters:
    - replication_task_arn: str - ARN of the replication task
    - start_replication_task_type: str - start type ('start-replication', 'resume-processing', 'reload-target')
    - cdc_start_time: datetime - CDC start time for incremental replication
    - cdc_start_position: str - CDC start position
    - cdc_stop_position: str - CDC stop position
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    dict: Task start response
    """
    def __init__(
        self,
        replication_task_arn: str,
        start_replication_task_type: str = 'start-replication',
        cdc_start_time: datetime = None,
        cdc_start_position: str = None,
        cdc_stop_position: str = None,
        **kwargs
    ): ...

Task Status Monitoring

Monitor DMS replication task status and completion.

class DmsTaskCompletedSensor(BaseSensorOperator):
    """
    Wait for a DMS replication task to complete.
    
    Parameters:
    - replication_task_arn: str - ARN of the replication task to monitor
    - target_statuses: list - list of target statuses to wait for
    - termination_statuses: list - statuses that indicate task failure
    - aws_conn_id: str - Airflow connection for AWS credentials
    - poke_interval: int - time between status checks
    - timeout: int - maximum time to wait
    
    Returns:
    bool: True when task reaches target status
    """
    def __init__(
        self,
        replication_task_arn: str,
        target_statuses: list = None,
        termination_statuses: list = None,
        aws_conn_id: str = 'aws_default',
        **kwargs
    ): ...

DMS Service Hook

Low-level DMS operations for endpoint and task management.

class DmsHook(AwsBaseHook):
    """
    Hook for AWS Database Migration Service operations.
    
    Parameters:
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    """
    def __init__(
        self,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        **kwargs
    ): ...
    
    def create_replication_task(
        self,
        replication_task_identifier: str,
        source_endpoint_arn: str,
        target_endpoint_arn: str,
        replication_instance_arn: str,
        migration_type: str,
        table_mappings: str,
        **kwargs
    ) -> dict:
        """Create a DMS replication task."""
        ...
    
    def start_replication_task(
        self,
        replication_task_arn: str,
        start_replication_task_type: str,
        **kwargs
    ) -> dict:
        """Start a DMS replication task."""
        ...
    
    def stop_replication_task(self, replication_task_arn: str) -> dict:
        """Stop a DMS replication task."""
        ...
    
    def delete_replication_task(self, replication_task_arn: str) -> dict:
        """Delete a DMS replication task."""
        ...
    
    def describe_replication_tasks(
        self,
        replication_task_arns: list = None,
        filters: list = None,
        **kwargs
    ) -> dict:
        """Describe DMS replication tasks."""
        ...
    
    def get_task_status(self, replication_task_arn: str) -> str:
        """Get the status of a replication task."""
        ...

Usage Examples

Database Migration Pipeline

from airflow.providers.amazon.aws.operators.dms import (
    DmsCreateTaskOperator,
    DmsStartTaskOperator
)
from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor

# Create replication task for database migration
create_migration_task = DmsCreateTaskOperator(
    task_id='create_migration_task',
    replication_task_id='postgres-to-redshift-migration',
    source_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:ABCDEFGHIJ',
    target_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:KLMNOPQRST',
    replication_instance_arn='arn:aws:dms:us-west-2:123456789012:rep:UVWXYZ1234',
    migration_type='full-load-and-cdc',
    table_mappings="""
    {
        "rules": [
            {
                "rule-type": "selection",
                "rule-id": "1",
                "rule-name": "1",
                "object-locator": {
                    "schema-name": "public",
                    "table-name": "%"
                },
                "rule-action": "include"
            }
        ]
    }
    """,
    aws_conn_id='aws_default'
)

# Start the migration task
start_migration = DmsStartTaskOperator(
    task_id='start_migration',
    replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',
    start_replication_task_type='start-replication',
    aws_conn_id='aws_default'
)

# Monitor migration completion
monitor_migration = DmsTaskCompletedSensor(
    task_id='monitor_migration',
    replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',
    target_statuses=['stopped'],
    poke_interval=60,
    timeout=7200,  # 2 hours
    aws_conn_id='aws_default'
)

create_migration_task >> start_migration >> monitor_migration

Import Statements

from airflow.providers.amazon.aws.operators.dms import (
    DmsCreateTaskOperator,
    DmsStartTaskOperator,
    DmsStopTaskOperator,
    DmsDeleteTaskOperator
)
from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
from airflow.providers.amazon.aws.hooks.dms import DmsHook

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json