Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
AWS Database Migration Service provides comprehensive database migration and replication capabilities, enabling seamless data transfer between different database engines and continuous data replication for data pipelines.
Create and manage DMS replication tasks for database migration and continuous data replication.
class DmsCreateTaskOperator(AwsBaseOperator):
"""
Create a DMS replication task.
Parameters:
- replication_task_id: str - unique identifier for the replication task
- source_endpoint_arn: str - ARN of the source endpoint
- target_endpoint_arn: str - ARN of the target endpoint
- replication_instance_arn: str - ARN of the replication instance
- migration_type: str - migration type ('full-load', 'cdc', 'full-load-and-cdc')
- table_mappings: str - JSON string defining table mapping rules
- replication_task_settings: str - JSON string with task settings
- create_task_kwargs: dict - additional task creation parameters
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Replication task ARN
"""
def __init__(
self,
replication_task_id: str,
source_endpoint_arn: str,
target_endpoint_arn: str,
replication_instance_arn: str,
migration_type: str,
table_mappings: str,
replication_task_settings: str = None,
create_task_kwargs: dict = None,
**kwargs
): ...class DmsStartTaskOperator(AwsBaseOperator):
"""
Start a DMS replication task.
Parameters:
- replication_task_arn: str - ARN of the replication task
- start_replication_task_type: str - start type ('start-replication', 'resume-processing', 'reload-target')
- cdc_start_time: datetime - CDC start time for incremental replication
- cdc_start_position: str - CDC start position
- cdc_stop_position: str - CDC stop position
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
dict: Task start response
"""
def __init__(
self,
replication_task_arn: str,
start_replication_task_type: str = 'start-replication',
cdc_start_time: datetime = None,
cdc_start_position: str = None,
cdc_stop_position: str = None,
**kwargs
): ...Monitor DMS replication task status and completion.
class DmsTaskCompletedSensor(BaseSensorOperator):
"""
Wait for a DMS replication task to complete.
Parameters:
- replication_task_arn: str - ARN of the replication task to monitor
- target_statuses: list - list of target statuses to wait for
- termination_statuses: list - statuses that indicate task failure
- aws_conn_id: str - Airflow connection for AWS credentials
- poke_interval: int - time between status checks
- timeout: int - maximum time to wait
Returns:
bool: True when task reaches target status
"""
def __init__(
self,
replication_task_arn: str,
target_statuses: list = None,
termination_statuses: list = None,
aws_conn_id: str = 'aws_default',
**kwargs
): ...Low-level DMS operations for endpoint and task management.
class DmsHook(AwsBaseHook):
"""
Hook for AWS Database Migration Service operations.
Parameters:
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
"""
def __init__(
self,
aws_conn_id: str = 'aws_default',
region_name: str = None,
**kwargs
): ...
def create_replication_task(
self,
replication_task_identifier: str,
source_endpoint_arn: str,
target_endpoint_arn: str,
replication_instance_arn: str,
migration_type: str,
table_mappings: str,
**kwargs
) -> dict:
"""Create a DMS replication task."""
...
def start_replication_task(
self,
replication_task_arn: str,
start_replication_task_type: str,
**kwargs
) -> dict:
"""Start a DMS replication task."""
...
def stop_replication_task(self, replication_task_arn: str) -> dict:
"""Stop a DMS replication task."""
...
def delete_replication_task(self, replication_task_arn: str) -> dict:
"""Delete a DMS replication task."""
...
def describe_replication_tasks(
self,
replication_task_arns: list = None,
filters: list = None,
**kwargs
) -> dict:
"""Describe DMS replication tasks."""
...
def get_task_status(self, replication_task_arn: str) -> str:
"""Get the status of a replication task."""
...from airflow.providers.amazon.aws.operators.dms import (
DmsCreateTaskOperator,
DmsStartTaskOperator
)
from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
# Create replication task for database migration
create_migration_task = DmsCreateTaskOperator(
task_id='create_migration_task',
replication_task_id='postgres-to-redshift-migration',
source_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:ABCDEFGHIJ',
target_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:KLMNOPQRST',
replication_instance_arn='arn:aws:dms:us-west-2:123456789012:rep:UVWXYZ1234',
migration_type='full-load-and-cdc',
table_mappings="""
{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "public",
"table-name": "%"
},
"rule-action": "include"
}
]
}
""",
aws_conn_id='aws_default'
)
# Start the migration task
start_migration = DmsStartTaskOperator(
task_id='start_migration',
replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',
start_replication_task_type='start-replication',
aws_conn_id='aws_default'
)
# Monitor migration completion
monitor_migration = DmsTaskCompletedSensor(
task_id='monitor_migration',
replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',
target_statuses=['stopped'],
poke_interval=60,
timeout=7200, # 2 hours
aws_conn_id='aws_default'
)
create_migration_task >> start_migration >> monitor_migrationfrom airflow.providers.amazon.aws.operators.dms import (
DmsCreateTaskOperator,
DmsStartTaskOperator,
DmsStopTaskOperator,
DmsDeleteTaskOperator
)
from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
from airflow.providers.amazon.aws.hooks.dms import DmsHookInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon