Azure Data Migration Client Library for programmatically managing database migration services and operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute and manage various migration and validation tasks including connection testing, schema migration, data migration, and assessment operations. Tasks are the core execution units that perform specific migration operations within projects.
Creates a new migration task or updates an existing one within a project.
def create_or_update(
group_name: str,
service_name: str,
project_name: str,
task_name: str,
parameters: ProjectTask,
**kwargs
) -> ProjectTask:
"""
Creates or updates a migration task.
Parameters:
- group_name: Name of the resource group
- service_name: Name of the Data Migration Service
- project_name: Name of the project
- task_name: Name of the task
- parameters: Task configuration and properties
Returns:
ProjectTask object with current state and results
"""Retrieves details and status of an existing task.
def get(
group_name: str,
service_name: str,
project_name: str,
task_name: str,
expand: str = None,
**kwargs
) -> ProjectTask:
"""
Gets a migration task with current status and results.
Parameters:
- group_name: Name of the resource group
- service_name: Name of the Data Migration Service
- project_name: Name of the project
- task_name: Name of the task
- expand: Properties to expand (e.g., "output")
Returns:
ProjectTask with current state, progress, and results
"""Cancels a running migration task.
def cancel(
group_name: str,
service_name: str,
project_name: str,
task_name: str,
**kwargs
) -> ProjectTask:
"""
Cancels a running migration task.
Parameters:
- group_name: Name of the resource group
- service_name: Name of the Data Migration Service
- project_name: Name of the project
- task_name: Name of the task
Returns:
ProjectTask with updated state
"""Executes commands on running tasks (e.g., complete sync, restart migration).
def command(
group_name: str,
service_name: str,
project_name: str,
task_name: str,
parameters: CommandProperties,
**kwargs
) -> CommandProperties:
"""
Executes a command on a migration task.
Parameters:
- group_name: Name of the resource group
- service_name: Name of the Data Migration Service
- project_name: Name of the project
- task_name: Name of the task
- parameters: Command to execute
Returns:
Command execution results
"""Lists all tasks within a project.
def list(
group_name: str,
service_name: str,
project_name: str,
task_type: str = None,
**kwargs
) -> ItemPaged[ProjectTask]:
"""
Lists migration tasks in a project.
Parameters:
- group_name: Name of the resource group
- service_name: Name of the Data Migration Service
- project_name: Name of the project
- task_type: Filter by task type (optional)
Returns:
Paged collection of ProjectTask objects
"""from azure.mgmt.datamigration.models import (
ProjectTask,
ConnectToSourceSqlServerTaskProperties,
ConnectToSourceSqlServerTaskInput
)
# Test SQL Server source connection
connection_test_properties = ConnectToSourceSqlServerTaskProperties(
input=ConnectToSourceSqlServerTaskInput(
source_connection_info=source_connection,
check_permissions_group="Default"
)
)
connection_test_task = ProjectTask(
properties=connection_test_properties
)
task = client.tasks.create_or_update(
group_name="myResourceGroup",
service_name="myMigrationService",
project_name="myProject",
task_name="connectionTest",
parameters=connection_test_task
)
# Check task status
task_status = client.tasks.get(
group_name="myResourceGroup",
service_name="myMigrationService",
project_name="myProject",
task_name="connectionTest",
expand="output"
)
print(f"Task state: {task_status.properties.state}")from azure.mgmt.datamigration.models import (
MigrateSchemaSqlServerSqlDbTaskProperties,
MigrateSchemaSqlServerSqlDbTaskInput,
MigrateSchemaSqlServerSqlDbDatabaseInput
)
# Schema migration task
schema_migration_properties = MigrateSchemaSqlServerSqlDbTaskProperties(
input=MigrateSchemaSqlServerSqlDbTaskInput(
source_connection_info=source_connection,
target_connection_info=target_connection,
selected_databases=[
MigrateSchemaSqlServerSqlDbDatabaseInput(
name="SourceDB",
target_database_name="TargetDB"
)
]
)
)
schema_task = ProjectTask(
properties=schema_migration_properties
)
task = client.tasks.create_or_update(
group_name="myResourceGroup",
service_name="myMigrationService",
project_name="myProject",
task_name="schemaMigration",
parameters=schema_task
)Tasks progress through these states during execution:
import time
from azure.mgmt.datamigration.models import TaskState
def monitor_task(client, group_name, service_name, project_name, task_name):
"""Monitor task execution until completion."""
while True:
task = client.tasks.get(
group_name=group_name,
service_name=service_name,
project_name=project_name,
task_name=task_name,
expand="output"
)
state = task.properties.state
print(f"Task state: {state}")
if state in [TaskState.SUCCEEDED, TaskState.FAILED, TaskState.CANCELED]:
break
time.sleep(30) # Wait 30 seconds before checking again
return taskclass ProjectTask:
"""Migration or validation task."""
def __init__(self, properties: ProjectTaskProperties, **kwargs):
"""
Initialize migration task.
Parameters:
- properties: Task-specific properties and configuration
"""
# Properties
etag: str # Entity tag for concurrency control
properties: ProjectTaskProperties # Task properties and configuration
id: str # Resource ID
name: str # Resource name
type: str # Resource typeclass ProjectTaskProperties:
"""Base class for task properties."""
# Properties
task_type: str # Task type identifier
errors: List[ODataError] # Task execution errors
state: str # Current task state
commands: List[CommandProperties] # Available commands
client_data: Dict[str, Any] # Client-specific dataclass MigrateSyncCompleteCommandProperties(CommandProperties):
"""Complete a sync migration."""
def __init__(self, input: MigrateSyncCompleteCommandInput, **kwargs):
"""
Parameters:
- input: Command input with database name
"""
# Properties
input: MigrateSyncCompleteCommandInput # Command input
output: MigrateSyncCompleteCommandOutput # Command outputclass MongoDbCancelCommand(CommandProperties):
"""Cancel MongoDB migration."""
def __init__(self, input: MongoDbCommandInput, **kwargs): ...
class MongoDbFinishCommand(CommandProperties):
"""Finish MongoDB migration."""
def __init__(self, input: MongoDbFinishCommandInput, **kwargs): ...
class MongoDbRestartCommand(CommandProperties):
"""Restart MongoDB migration."""
def __init__(self, input: MongoDbCommandInput, **kwargs): ...Common task execution errors:
Monitor task errors through the errors property and implement appropriate retry logic for transient failures.
Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-datamigration