Microsoft Azure Machine Learning Client Library for Python providing comprehensive SDK for ML workflows including job execution, pipeline components, model deployment, and AutoML capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive job execution capabilities for Azure Machine Learning including command jobs, pipeline jobs, Spark jobs, sweep jobs for hyperparameter tuning, and various job configuration options.
Functions for creating different types of jobs with declarative configuration.
def command(
*,
code: str = None,
command: str,
inputs: dict = None,
outputs: dict = None,
environment: Environment = None,
environment_variables: dict = None,
compute: str = None,
resources: JobResources = None,
distribution: Union[MpiDistribution, PyTorchDistribution, TensorFlowDistribution, RayDistribution] = None,
limits: CommandJobLimits = None,
identity: IdentityConfiguration = None,
services: dict = None,
**kwargs
) -> CommandJob:
"""
Create a command job for executing code with specified parameters.
Parameters:
- code: Path to source code directory or file
- command: Command to execute (supports parameter substitution with ${{inputs.param_name}})
- inputs: Dictionary of input parameters
- outputs: Dictionary of output specifications
- environment: Environment for job execution
- environment_variables: Environment variables to set
- compute: Compute target name
- resources: Resource requirements (instance type, count)
- distribution: Distributed training configuration
- limits: Job execution limits (timeout, max trials, etc.)
- identity: Identity configuration for authentication
- services: Interactive services (SSH, Jupyter, TensorBoard, etc.)
Returns:
CommandJob object ready for submission
"""
def spark(
*,
code: str,
entry: SparkJobEntry,
py_files: list = None,
jars: list = None,
files: list = None,
archives: list = None,
conf: dict = None,
args: str = None,
compute: str,
inputs: dict = None,
outputs: dict = None,
**kwargs
) -> SparkJob:
"""
Create a Spark job for big data processing.
Parameters:
- code: Path to Spark application code
- entry: Spark job entry point (Python file, JAR, etc.)
- py_files: Additional Python files to include
- jars: JAR files to include in classpath
- files: Additional files to distribute
- archives: Archive files to distribute
- conf: Spark configuration properties
- args: Command line arguments
- compute: Spark compute target name
- inputs: Input data specifications
- outputs: Output data specifications
Returns:
SparkJob object ready for submission
"""Execute arbitrary commands with full control over environment, resources, and data flow.
class CommandJob:
def __init__(
self,
*,
command: str,
code: str = None,
inputs: dict = None,
outputs: dict = None,
environment: Environment = None,
environment_variables: dict = None,
compute: str = None,
resources: JobResources = None,
distribution: Union[MpiDistribution, PyTorchDistribution, TensorFlowDistribution, RayDistribution] = None,
limits: CommandJobLimits = None,
identity: IdentityConfiguration = None,
services: dict = None,
**kwargs
):
"""
Command job for executing arbitrary commands.
Parameters:
- command: Command string to execute
- code: Source code path
- inputs: Input parameters and data
- outputs: Output specifications
- environment: Execution environment
- environment_variables: Environment variables
- compute: Target compute resource
- resources: Resource requirements
- distribution: Distributed training setup
- limits: Execution limits and timeouts
- identity: Authentication identity
- services: Interactive services configuration
"""from azure.ai.ml import command, Input, Output
from azure.ai.ml.entities import Environment
# Create a command job
job = command(
code="./src",
command="python train.py --learning_rate ${{inputs.learning_rate}} --epochs ${{inputs.epochs}}",
inputs={
"learning_rate": 0.01,
"epochs": 10,
"training_data": Input(type="uri_folder", path="azureml://datastores/workspaceblobstore/paths/data/")
},
outputs={
"model": Output(type="uri_folder", path="azureml://datastores/workspaceblobstore/paths/models/")
},
environment=Environment(
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
conda_file="./environment.yml"
),
compute="cpu-cluster",
experiment_name="my-experiment"
)
# Submit the job
submitted_job = ml_client.jobs.create_or_update(job)Orchestrate complex workflows with multiple steps, dependencies, and data flow.
class PipelineJob:
def __init__(
self,
*,
jobs: dict = None,
inputs: dict = None,
outputs: dict = None,
settings: PipelineJobSettings = None,
compute: str = None,
**kwargs
):
"""
Pipeline job for orchestrating multiple steps.
Parameters:
- jobs: Dictionary of job steps in the pipeline
- inputs: Pipeline-level inputs
- outputs: Pipeline-level outputs
- settings: Pipeline execution settings
- compute: Default compute target for pipeline steps
"""
class PipelineJobSettings:
def __init__(
self,
*,
default_datastore: str = None,
default_compute: str = None,
continue_on_step_failure: bool = False,
**kwargs
):
"""
Settings for pipeline job execution.
Parameters:
- default_datastore: Default datastore for pipeline
- default_compute: Default compute target
- continue_on_step_failure: Whether to continue on step failures
"""Execute big data processing workloads using Apache Spark.
class SparkJob:
def __init__(
self,
*,
code: str,
entry: SparkJobEntry,
py_files: list = None,
jars: list = None,
files: list = None,
archives: list = None,
conf: dict = None,
args: str = None,
compute: str,
inputs: dict = None,
outputs: dict = None,
resources: SparkResourceConfiguration = None,
**kwargs
):
"""
Spark job for big data processing.
Parameters:
- code: Path to Spark application
- entry: Entry point specification
- py_files: Python dependencies
- jars: JAR dependencies
- files: Additional files
- archives: Archive files
- conf: Spark configuration
- args: Application arguments
- compute: Spark compute target
- inputs: Input data
- outputs: Output data
- resources: Resource configuration
"""
class SparkJobEntry:
def __init__(
self,
*,
entry_type: SparkJobEntryType,
reference: str
):
"""
Spark job entry point specification.
Parameters:
- entry_type: Type of entry (spark_job_python_entry, spark_job_scala_entry)
- reference: Path to entry file
"""
class SparkJobEntryType:
SPARK_JOB_PYTHON_ENTRY = "spark_job_python_entry"
SPARK_JOB_SCALA_ENTRY = "spark_job_scala_entry"
class SparkResourceConfiguration:
def __init__(
self,
*,
instance_type: str,
runtime_version: str = "3.2"
):
"""
Spark resource configuration.
Parameters:
- instance_type: Compute instance type
- runtime_version: Spark runtime version
"""class JobResources:
def __init__(
self,
*,
instance_type: str = None,
instance_count: int = 1,
shm_size: str = None,
**kwargs
):
"""
Job resource requirements.
Parameters:
- instance_type: VM size (Standard_DS3_v2, Standard_NC6, etc.)
- instance_count: Number of instances
- shm_size: Shared memory size
"""
class CommandJobLimits:
def __init__(
self,
*,
timeout: int = None,
**kwargs
):
"""
Limits for command job execution.
Parameters:
- timeout: Job timeout in seconds
"""
class QueueSettings:
def __init__(
self,
*,
job_tier: str = None,
priority: str = None,
**kwargs
):
"""
Queue settings for job scheduling.
Parameters:
- job_tier: Job tier (Spot, Basic, Standard, Premium)
- priority: Job priority (Low, Medium, High)
"""Configuration for distributed training across multiple nodes.
class MpiDistribution:
def __init__(
self,
*,
process_count_per_instance: int = 1
):
"""
MPI distribution for multi-node training.
Parameters:
- process_count_per_instance: Number of processes per instance
"""
class PyTorchDistribution:
def __init__(
self,
*,
process_count_per_instance: int = 1
):
"""
PyTorch distributed training configuration.
Parameters:
- process_count_per_instance: Number of processes per instance
"""
class TensorFlowDistribution:
def __init__(
self,
*,
worker_count: int = 1,
parameter_server_count: int = 0
):
"""
TensorFlow distribution strategy.
Parameters:
- worker_count: Number of worker nodes
- parameter_server_count: Number of parameter servers
"""
class RayDistribution:
def __init__(
self,
*,
port: int = 10001,
address: str = None,
include_dashboard: bool = False,
dashboard_port: int = 8265
):
"""
Ray distributed computing configuration.
Parameters:
- port: Ray head node port
- address: Ray cluster address
- include_dashboard: Whether to include Ray dashboard
- dashboard_port: Dashboard port number
"""Interactive services available during job execution.
class JobService:
"""Base class for job services."""
class SshJobService(JobService):
def __init__(
self,
*,
ssh_public_keys: str = None,
**kwargs
):
"""
SSH service for job access.
Parameters:
- ssh_public_keys: SSH public keys for authentication
"""
class JupyterLabJobService(JobService):
def __init__(self, **kwargs):
"""JupyterLab service for interactive development."""
class TensorBoardJobService(JobService):
def __init__(
self,
*,
log_dir: str = None,
**kwargs
):
"""
TensorBoard service for experiment tracking.
Parameters:
- log_dir: Directory containing TensorBoard logs
"""
class VsCodeJobService(JobService):
def __init__(self, **kwargs):
"""VS Code service for remote development."""class Input:
def __init__(
self,
*,
type: str,
path: str = None,
mode: str = None,
**kwargs
):
"""
Job input specification.
Parameters:
- type: Input type (uri_file, uri_folder, mltable, mlflow_model)
- path: Path to input data
- mode: Access mode (ro_mount, rw_mount, download, direct)
"""
class Output:
def __init__(
self,
*,
type: str,
path: str = None,
mode: str = None,
**kwargs
):
"""
Job output specification.
Parameters:
- type: Output type (uri_file, uri_folder, mltable, mlflow_model)
- path: Output path
- mode: Access mode (rw_mount, upload)
"""from azure.ai.ml import Input, Output
# Define inputs and outputs
inputs = {
"training_data": Input(
type="uri_folder",
path="azureml://datastores/workspaceblobstore/paths/data/train/",
mode="ro_mount"
),
"learning_rate": 0.01,
"epochs": 100
}
outputs = {
"model": Output(
type="uri_folder",
path="azureml://datastores/workspaceblobstore/paths/models/my-model/",
mode="rw_mount"
),
"metrics": Output(
type="uri_file",
path="azureml://datastores/workspaceblobstore/paths/metrics/metrics.json"
)
}Install with Tessl CLI
npx tessl i tessl/pypi-azure-ai-ml