0
# AWS Database Migration Service (DMS)
1
2
AWS Database Migration Service provides comprehensive database migration and replication capabilities, enabling seamless data transfer between different database engines and continuous data replication for data pipelines.
3
4
## Capabilities
5
6
### Replication Task Management
7
8
Create and manage DMS replication tasks for database migration and continuous data replication.
9
10
```python { .api }
11
class DmsCreateTaskOperator(AwsBaseOperator):
12
"""
13
Create a DMS replication task.
14
15
Parameters:
16
- replication_task_id: str - unique identifier for the replication task
17
- source_endpoint_arn: str - ARN of the source endpoint
18
- target_endpoint_arn: str - ARN of the target endpoint
19
- replication_instance_arn: str - ARN of the replication instance
20
- migration_type: str - migration type ('full-load', 'cdc', 'full-load-and-cdc')
21
- table_mappings: str - JSON string defining table mapping rules
22
- replication_task_settings: str - JSON string with task settings
23
- create_task_kwargs: dict - additional task creation parameters
24
- aws_conn_id: str - Airflow connection for AWS credentials
25
26
Returns:
27
str: Replication task ARN
28
"""
29
def __init__(
30
self,
31
replication_task_id: str,
32
source_endpoint_arn: str,
33
target_endpoint_arn: str,
34
replication_instance_arn: str,
35
migration_type: str,
36
table_mappings: str,
37
replication_task_settings: str = None,
38
create_task_kwargs: dict = None,
39
**kwargs
40
): ...
41
```
42
43
```python { .api }
44
class DmsStartTaskOperator(AwsBaseOperator):
45
"""
46
Start a DMS replication task.
47
48
Parameters:
49
- replication_task_arn: str - ARN of the replication task
50
- start_replication_task_type: str - start type ('start-replication', 'resume-processing', 'reload-target')
51
- cdc_start_time: datetime - CDC start time for incremental replication
52
- cdc_start_position: str - CDC start position
53
- cdc_stop_position: str - CDC stop position
54
- aws_conn_id: str - Airflow connection for AWS credentials
55
56
Returns:
57
dict: Task start response
58
"""
59
def __init__(
60
self,
61
replication_task_arn: str,
62
start_replication_task_type: str = 'start-replication',
63
cdc_start_time: datetime = None,
64
cdc_start_position: str = None,
65
cdc_stop_position: str = None,
66
**kwargs
67
): ...
68
```
69
70
### Task Status Monitoring
71
72
Monitor DMS replication task status and completion.
73
74
```python { .api }
75
class DmsTaskCompletedSensor(BaseSensorOperator):
76
"""
77
Wait for a DMS replication task to complete.
78
79
Parameters:
80
- replication_task_arn: str - ARN of the replication task to monitor
81
- target_statuses: list - list of target statuses to wait for
82
- termination_statuses: list - statuses that indicate task failure
83
- aws_conn_id: str - Airflow connection for AWS credentials
84
- poke_interval: int - time between status checks
85
- timeout: int - maximum time to wait
86
87
Returns:
88
bool: True when task reaches target status
89
"""
90
def __init__(
91
self,
92
replication_task_arn: str,
93
target_statuses: list = None,
94
termination_statuses: list = None,
95
aws_conn_id: str = 'aws_default',
96
**kwargs
97
): ...
98
```
99
100
### DMS Service Hook
101
102
Low-level DMS operations for endpoint and task management.
103
104
```python { .api }
105
class DmsHook(AwsBaseHook):
106
"""
107
Hook for AWS Database Migration Service operations.
108
109
Parameters:
110
- aws_conn_id: str - Airflow connection for AWS credentials
111
- region_name: str - AWS region name
112
"""
113
def __init__(
114
self,
115
aws_conn_id: str = 'aws_default',
116
region_name: str = None,
117
**kwargs
118
): ...
119
120
def create_replication_task(
121
self,
122
replication_task_identifier: str,
123
source_endpoint_arn: str,
124
target_endpoint_arn: str,
125
replication_instance_arn: str,
126
migration_type: str,
127
table_mappings: str,
128
**kwargs
129
) -> dict:
130
"""Create a DMS replication task."""
131
...
132
133
def start_replication_task(
134
self,
135
replication_task_arn: str,
136
start_replication_task_type: str,
137
**kwargs
138
) -> dict:
139
"""Start a DMS replication task."""
140
...
141
142
def stop_replication_task(self, replication_task_arn: str) -> dict:
143
"""Stop a DMS replication task."""
144
...
145
146
def delete_replication_task(self, replication_task_arn: str) -> dict:
147
"""Delete a DMS replication task."""
148
...
149
150
def describe_replication_tasks(
151
self,
152
replication_task_arns: list = None,
153
filters: list = None,
154
**kwargs
155
) -> dict:
156
"""Describe DMS replication tasks."""
157
...
158
159
def get_task_status(self, replication_task_arn: str) -> str:
160
"""Get the status of a replication task."""
161
...
162
```
163
164
## Usage Examples
165
166
### Database Migration Pipeline
167
168
```python
169
from airflow.providers.amazon.aws.operators.dms import (
170
DmsCreateTaskOperator,
171
DmsStartTaskOperator
172
)
173
from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
174
175
# Create replication task for database migration
176
create_migration_task = DmsCreateTaskOperator(
177
task_id='create_migration_task',
178
replication_task_id='postgres-to-redshift-migration',
179
source_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:ABCDEFGHIJ',
180
target_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:KLMNOPQRST',
181
replication_instance_arn='arn:aws:dms:us-west-2:123456789012:rep:UVWXYZ1234',
182
migration_type='full-load-and-cdc',
183
table_mappings="""
184
{
185
"rules": [
186
{
187
"rule-type": "selection",
188
"rule-id": "1",
189
"rule-name": "1",
190
"object-locator": {
191
"schema-name": "public",
192
"table-name": "%"
193
},
194
"rule-action": "include"
195
}
196
]
197
}
198
""",
199
aws_conn_id='aws_default'
200
)
201
202
# Start the migration task
203
start_migration = DmsStartTaskOperator(
204
task_id='start_migration',
205
replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',
206
start_replication_task_type='start-replication',
207
aws_conn_id='aws_default'
208
)
209
210
# Monitor migration completion
211
monitor_migration = DmsTaskCompletedSensor(
212
task_id='monitor_migration',
213
replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',
214
target_statuses=['stopped'],
215
poke_interval=60,
216
timeout=7200, # 2 hours
217
aws_conn_id='aws_default'
218
)
219
220
create_migration_task >> start_migration >> monitor_migration
221
```
222
223
## Import Statements
224
225
```python
226
from airflow.providers.amazon.aws.operators.dms import (
227
DmsCreateTaskOperator,
228
DmsStartTaskOperator,
229
DmsStopTaskOperator,
230
DmsDeleteTaskOperator
231
)
232
from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
233
from airflow.providers.amazon.aws.hooks.dms import DmsHook
234
```