or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

dms-migration.mddocs/

0

# AWS Database Migration Service (DMS)

1

2

AWS Database Migration Service provides comprehensive database migration and replication capabilities, enabling seamless data transfer between different database engines and continuous data replication for data pipelines.

3

4

## Capabilities

5

6

### Replication Task Management

7

8

Create and manage DMS replication tasks for database migration and continuous data replication.

9

10

```python { .api }

11

class DmsCreateTaskOperator(AwsBaseOperator):

12

"""

13

Create a DMS replication task.

14

15

Parameters:

16

- replication_task_id: str - unique identifier for the replication task

17

- source_endpoint_arn: str - ARN of the source endpoint

18

- target_endpoint_arn: str - ARN of the target endpoint

19

- replication_instance_arn: str - ARN of the replication instance

20

- migration_type: str - migration type ('full-load', 'cdc', 'full-load-and-cdc')

21

- table_mappings: str - JSON string defining table mapping rules

22

- replication_task_settings: str - JSON string with task settings

23

- create_task_kwargs: dict - additional task creation parameters

24

- aws_conn_id: str - Airflow connection for AWS credentials

25

26

Returns:

27

str: Replication task ARN

28

"""

29

def __init__(

30

self,

31

replication_task_id: str,

32

source_endpoint_arn: str,

33

target_endpoint_arn: str,

34

replication_instance_arn: str,

35

migration_type: str,

36

table_mappings: str,

37

replication_task_settings: str = None,

38

create_task_kwargs: dict = None,

39

**kwargs

40

): ...

41

```

42

43

```python { .api }

44

class DmsStartTaskOperator(AwsBaseOperator):

45

"""

46

Start a DMS replication task.

47

48

Parameters:

49

- replication_task_arn: str - ARN of the replication task

50

- start_replication_task_type: str - start type ('start-replication', 'resume-processing', 'reload-target')

51

- cdc_start_time: datetime - CDC start time for incremental replication

52

- cdc_start_position: str - CDC start position

53

- cdc_stop_position: str - CDC stop position

54

- aws_conn_id: str - Airflow connection for AWS credentials

55

56

Returns:

57

dict: Task start response

58

"""

59

def __init__(

60

self,

61

replication_task_arn: str,

62

start_replication_task_type: str = 'start-replication',

63

cdc_start_time: datetime = None,

64

cdc_start_position: str = None,

65

cdc_stop_position: str = None,

66

**kwargs

67

): ...

68

```

69

70

### Task Status Monitoring

71

72

Monitor DMS replication task status and completion.

73

74

```python { .api }

75

class DmsTaskCompletedSensor(BaseSensorOperator):

76

"""

77

Wait for a DMS replication task to complete.

78

79

Parameters:

80

- replication_task_arn: str - ARN of the replication task to monitor

81

- target_statuses: list - list of target statuses to wait for

82

- termination_statuses: list - statuses that indicate task failure

83

- aws_conn_id: str - Airflow connection for AWS credentials

84

- poke_interval: int - time between status checks

85

- timeout: int - maximum time to wait

86

87

Returns:

88

bool: True when task reaches target status

89

"""

90

def __init__(

91

self,

92

replication_task_arn: str,

93

target_statuses: list = None,

94

termination_statuses: list = None,

95

aws_conn_id: str = 'aws_default',

96

**kwargs

97

): ...

98

```

99

100

### DMS Service Hook

101

102

Low-level DMS operations for endpoint and task management.

103

104

```python { .api }

105

class DmsHook(AwsBaseHook):

106

"""

107

Hook for AWS Database Migration Service operations.

108

109

Parameters:

110

- aws_conn_id: str - Airflow connection for AWS credentials

111

- region_name: str - AWS region name

112

"""

113

def __init__(

114

self,

115

aws_conn_id: str = 'aws_default',

116

region_name: str = None,

117

**kwargs

118

): ...

119

120

def create_replication_task(

121

self,

122

replication_task_identifier: str,

123

source_endpoint_arn: str,

124

target_endpoint_arn: str,

125

replication_instance_arn: str,

126

migration_type: str,

127

table_mappings: str,

128

**kwargs

129

) -> dict:

130

"""Create a DMS replication task."""

131

...

132

133

def start_replication_task(

134

self,

135

replication_task_arn: str,

136

start_replication_task_type: str,

137

**kwargs

138

) -> dict:

139

"""Start a DMS replication task."""

140

...

141

142

def stop_replication_task(self, replication_task_arn: str) -> dict:

143

"""Stop a DMS replication task."""

144

...

145

146

def delete_replication_task(self, replication_task_arn: str) -> dict:

147

"""Delete a DMS replication task."""

148

...

149

150

def describe_replication_tasks(

151

self,

152

replication_task_arns: list = None,

153

filters: list = None,

154

**kwargs

155

) -> dict:

156

"""Describe DMS replication tasks."""

157

...

158

159

def get_task_status(self, replication_task_arn: str) -> str:

160

"""Get the status of a replication task."""

161

...

162

```

163

164

## Usage Examples

165

166

### Database Migration Pipeline

167

168

```python

169

from airflow.providers.amazon.aws.operators.dms import (

170

DmsCreateTaskOperator,

171

DmsStartTaskOperator

172

)

173

from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor

174

175

# Create replication task for database migration

176

create_migration_task = DmsCreateTaskOperator(

177

task_id='create_migration_task',

178

replication_task_id='postgres-to-redshift-migration',

179

source_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:ABCDEFGHIJ',

180

target_endpoint_arn='arn:aws:dms:us-west-2:123456789012:endpoint:KLMNOPQRST',

181

replication_instance_arn='arn:aws:dms:us-west-2:123456789012:rep:UVWXYZ1234',

182

migration_type='full-load-and-cdc',

183

table_mappings="""

184

{

185

"rules": [

186

{

187

"rule-type": "selection",

188

"rule-id": "1",

189

"rule-name": "1",

190

"object-locator": {

191

"schema-name": "public",

192

"table-name": "%"

193

},

194

"rule-action": "include"

195

}

196

]

197

}

198

""",

199

aws_conn_id='aws_default'

200

)

201

202

# Start the migration task

203

start_migration = DmsStartTaskOperator(

204

task_id='start_migration',

205

replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',

206

start_replication_task_type='start-replication',

207

aws_conn_id='aws_default'

208

)

209

210

# Monitor migration completion

211

monitor_migration = DmsTaskCompletedSensor(

212

task_id='monitor_migration',

213

replication_task_arn='{{ ti.xcom_pull(task_ids="create_migration_task") }}',

214

target_statuses=['stopped'],

215

poke_interval=60,

216

timeout=7200, # 2 hours

217

aws_conn_id='aws_default'

218

)

219

220

create_migration_task >> start_migration >> monitor_migration

221

```

222

223

## Import Statements

224

225

```python

226

from airflow.providers.amazon.aws.operators.dms import (

227

DmsCreateTaskOperator,

228

DmsStartTaskOperator,

229

DmsStopTaskOperator,

230

DmsDeleteTaskOperator

231

)

232

from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor

233

from airflow.providers.amazon.aws.hooks.dms import DmsHook

234

```