0
# Job Management
1
2
Comprehensive job execution capabilities for Azure Machine Learning including command jobs, pipeline jobs, Spark jobs, sweep jobs for hyperparameter tuning, and various job configuration options.
3
4
## Capabilities
5
6
### Job Builder Functions
7
8
Functions for creating different types of jobs with declarative configuration.
9
10
```python { .api }
11
def command(
12
*,
13
code: str = None,
14
command: str,
15
inputs: dict = None,
16
outputs: dict = None,
17
environment: Environment = None,
18
environment_variables: dict = None,
19
compute: str = None,
20
resources: JobResources = None,
21
distribution: Union[MpiDistribution, PyTorchDistribution, TensorFlowDistribution, RayDistribution] = None,
22
limits: CommandJobLimits = None,
23
identity: IdentityConfiguration = None,
24
services: dict = None,
25
**kwargs
26
) -> CommandJob:
27
"""
28
Create a command job for executing code with specified parameters.
29
30
Parameters:
31
- code: Path to source code directory or file
32
- command: Command to execute (supports parameter substitution with ${{inputs.param_name}})
33
- inputs: Dictionary of input parameters
34
- outputs: Dictionary of output specifications
35
- environment: Environment for job execution
36
- environment_variables: Environment variables to set
37
- compute: Compute target name
38
- resources: Resource requirements (instance type, count)
39
- distribution: Distributed training configuration
40
- limits: Job execution limits (timeout, max trials, etc.)
41
- identity: Identity configuration for authentication
42
- services: Interactive services (SSH, Jupyter, TensorBoard, etc.)
43
44
Returns:
45
CommandJob object ready for submission
46
"""
47
48
def spark(
49
*,
50
code: str,
51
entry: SparkJobEntry,
52
py_files: list = None,
53
jars: list = None,
54
files: list = None,
55
archives: list = None,
56
conf: dict = None,
57
args: str = None,
58
compute: str,
59
inputs: dict = None,
60
outputs: dict = None,
61
**kwargs
62
) -> SparkJob:
63
"""
64
Create a Spark job for big data processing.
65
66
Parameters:
67
- code: Path to Spark application code
68
- entry: Spark job entry point (Python file, JAR, etc.)
69
- py_files: Additional Python files to include
70
- jars: JAR files to include in classpath
71
- files: Additional files to distribute
72
- archives: Archive files to distribute
73
- conf: Spark configuration properties
74
- args: Command line arguments
75
- compute: Spark compute target name
76
- inputs: Input data specifications
77
- outputs: Output data specifications
78
79
Returns:
80
SparkJob object ready for submission
81
"""
82
```
83
84
### Command Jobs
85
86
Execute arbitrary commands with full control over environment, resources, and data flow.
87
88
```python { .api }
89
class CommandJob:
90
def __init__(
91
self,
92
*,
93
command: str,
94
code: str = None,
95
inputs: dict = None,
96
outputs: dict = None,
97
environment: Environment = None,
98
environment_variables: dict = None,
99
compute: str = None,
100
resources: JobResources = None,
101
distribution: Union[MpiDistribution, PyTorchDistribution, TensorFlowDistribution, RayDistribution] = None,
102
limits: CommandJobLimits = None,
103
identity: IdentityConfiguration = None,
104
services: dict = None,
105
**kwargs
106
):
107
"""
108
Command job for executing arbitrary commands.
109
110
Parameters:
111
- command: Command string to execute
112
- code: Source code path
113
- inputs: Input parameters and data
114
- outputs: Output specifications
115
- environment: Execution environment
116
- environment_variables: Environment variables
117
- compute: Target compute resource
118
- resources: Resource requirements
119
- distribution: Distributed training setup
120
- limits: Execution limits and timeouts
121
- identity: Authentication identity
122
- services: Interactive services configuration
123
"""
124
```
125
126
#### Usage Example
127
128
```python
129
from azure.ai.ml import command, Input, Output
130
from azure.ai.ml.entities import Environment
131
132
# Create a command job
133
job = command(
134
code="./src",
135
command="python train.py --learning_rate ${{inputs.learning_rate}} --epochs ${{inputs.epochs}}",
136
inputs={
137
"learning_rate": 0.01,
138
"epochs": 10,
139
"training_data": Input(type="uri_folder", path="azureml://datastores/workspaceblobstore/paths/data/")
140
},
141
outputs={
142
"model": Output(type="uri_folder", path="azureml://datastores/workspaceblobstore/paths/models/")
143
},
144
environment=Environment(
145
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
146
conda_file="./environment.yml"
147
),
148
compute="cpu-cluster",
149
experiment_name="my-experiment"
150
)
151
152
# Submit the job
153
submitted_job = ml_client.jobs.create_or_update(job)
154
```
155
156
### Pipeline Jobs
157
158
Orchestrate complex workflows with multiple steps, dependencies, and data flow.
159
160
```python { .api }
161
class PipelineJob:
162
def __init__(
163
self,
164
*,
165
jobs: dict = None,
166
inputs: dict = None,
167
outputs: dict = None,
168
settings: PipelineJobSettings = None,
169
compute: str = None,
170
**kwargs
171
):
172
"""
173
Pipeline job for orchestrating multiple steps.
174
175
Parameters:
176
- jobs: Dictionary of job steps in the pipeline
177
- inputs: Pipeline-level inputs
178
- outputs: Pipeline-level outputs
179
- settings: Pipeline execution settings
180
- compute: Default compute target for pipeline steps
181
"""
182
183
class PipelineJobSettings:
184
def __init__(
185
self,
186
*,
187
default_datastore: str = None,
188
default_compute: str = None,
189
continue_on_step_failure: bool = False,
190
**kwargs
191
):
192
"""
193
Settings for pipeline job execution.
194
195
Parameters:
196
- default_datastore: Default datastore for pipeline
197
- default_compute: Default compute target
198
- continue_on_step_failure: Whether to continue on step failures
199
"""
200
```
201
202
### Spark Jobs
203
204
Execute big data processing workloads using Apache Spark.
205
206
```python { .api }
207
class SparkJob:
208
def __init__(
209
self,
210
*,
211
code: str,
212
entry: SparkJobEntry,
213
py_files: list = None,
214
jars: list = None,
215
files: list = None,
216
archives: list = None,
217
conf: dict = None,
218
args: str = None,
219
compute: str,
220
inputs: dict = None,
221
outputs: dict = None,
222
resources: SparkResourceConfiguration = None,
223
**kwargs
224
):
225
"""
226
Spark job for big data processing.
227
228
Parameters:
229
- code: Path to Spark application
230
- entry: Entry point specification
231
- py_files: Python dependencies
232
- jars: JAR dependencies
233
- files: Additional files
234
- archives: Archive files
235
- conf: Spark configuration
236
- args: Application arguments
237
- compute: Spark compute target
238
- inputs: Input data
239
- outputs: Output data
240
- resources: Resource configuration
241
"""
242
243
class SparkJobEntry:
244
def __init__(
245
self,
246
*,
247
entry_type: SparkJobEntryType,
248
reference: str
249
):
250
"""
251
Spark job entry point specification.
252
253
Parameters:
254
- entry_type: Type of entry (spark_job_python_entry, spark_job_scala_entry)
255
- reference: Path to entry file
256
"""
257
258
class SparkJobEntryType:
259
SPARK_JOB_PYTHON_ENTRY = "spark_job_python_entry"
260
SPARK_JOB_SCALA_ENTRY = "spark_job_scala_entry"
261
262
class SparkResourceConfiguration:
263
def __init__(
264
self,
265
*,
266
instance_type: str,
267
runtime_version: str = "3.2"
268
):
269
"""
270
Spark resource configuration.
271
272
Parameters:
273
- instance_type: Compute instance type
274
- runtime_version: Spark runtime version
275
"""
276
```
277
278
### Job Configuration Classes
279
280
```python { .api }
281
class JobResources:
282
def __init__(
283
self,
284
*,
285
instance_type: str = None,
286
instance_count: int = 1,
287
shm_size: str = None,
288
**kwargs
289
):
290
"""
291
Job resource requirements.
292
293
Parameters:
294
- instance_type: VM size (Standard_DS3_v2, Standard_NC6, etc.)
295
- instance_count: Number of instances
296
- shm_size: Shared memory size
297
"""
298
299
class CommandJobLimits:
300
def __init__(
301
self,
302
*,
303
timeout: int = None,
304
**kwargs
305
):
306
"""
307
Limits for command job execution.
308
309
Parameters:
310
- timeout: Job timeout in seconds
311
"""
312
313
class QueueSettings:
314
def __init__(
315
self,
316
*,
317
job_tier: str = None,
318
priority: str = None,
319
**kwargs
320
):
321
"""
322
Queue settings for job scheduling.
323
324
Parameters:
325
- job_tier: Job tier (Spot, Basic, Standard, Premium)
326
- priority: Job priority (Low, Medium, High)
327
"""
328
```
329
330
### Distribution Classes
331
332
Configuration for distributed training across multiple nodes.
333
334
```python { .api }
335
class MpiDistribution:
336
def __init__(
337
self,
338
*,
339
process_count_per_instance: int = 1
340
):
341
"""
342
MPI distribution for multi-node training.
343
344
Parameters:
345
- process_count_per_instance: Number of processes per instance
346
"""
347
348
class PyTorchDistribution:
349
def __init__(
350
self,
351
*,
352
process_count_per_instance: int = 1
353
):
354
"""
355
PyTorch distributed training configuration.
356
357
Parameters:
358
- process_count_per_instance: Number of processes per instance
359
"""
360
361
class TensorFlowDistribution:
362
def __init__(
363
self,
364
*,
365
worker_count: int = 1,
366
parameter_server_count: int = 0
367
):
368
"""
369
TensorFlow distribution strategy.
370
371
Parameters:
372
- worker_count: Number of worker nodes
373
- parameter_server_count: Number of parameter servers
374
"""
375
376
class RayDistribution:
377
def __init__(
378
self,
379
*,
380
port: int = 10001,
381
address: str = None,
382
include_dashboard: bool = False,
383
dashboard_port: int = 8265
384
):
385
"""
386
Ray distributed computing configuration.
387
388
Parameters:
389
- port: Ray head node port
390
- address: Ray cluster address
391
- include_dashboard: Whether to include Ray dashboard
392
- dashboard_port: Dashboard port number
393
"""
394
```
395
396
### Job Services
397
398
Interactive services available during job execution.
399
400
```python { .api }
401
class JobService:
402
"""Base class for job services."""
403
404
class SshJobService(JobService):
405
def __init__(
406
self,
407
*,
408
ssh_public_keys: str = None,
409
**kwargs
410
):
411
"""
412
SSH service for job access.
413
414
Parameters:
415
- ssh_public_keys: SSH public keys for authentication
416
"""
417
418
class JupyterLabJobService(JobService):
419
def __init__(self, **kwargs):
420
"""JupyterLab service for interactive development."""
421
422
class TensorBoardJobService(JobService):
423
def __init__(
424
self,
425
*,
426
log_dir: str = None,
427
**kwargs
428
):
429
"""
430
TensorBoard service for experiment tracking.
431
432
Parameters:
433
- log_dir: Directory containing TensorBoard logs
434
"""
435
436
class VsCodeJobService(JobService):
437
def __init__(self, **kwargs):
438
"""VS Code service for remote development."""
439
```
440
441
### Input and Output Types
442
443
```python { .api }
444
class Input:
445
def __init__(
446
self,
447
*,
448
type: str,
449
path: str = None,
450
mode: str = None,
451
**kwargs
452
):
453
"""
454
Job input specification.
455
456
Parameters:
457
- type: Input type (uri_file, uri_folder, mltable, mlflow_model)
458
- path: Path to input data
459
- mode: Access mode (ro_mount, rw_mount, download, direct)
460
"""
461
462
class Output:
463
def __init__(
464
self,
465
*,
466
type: str,
467
path: str = None,
468
mode: str = None,
469
**kwargs
470
):
471
"""
472
Job output specification.
473
474
Parameters:
475
- type: Output type (uri_file, uri_folder, mltable, mlflow_model)
476
- path: Output path
477
- mode: Access mode (rw_mount, upload)
478
"""
479
```
480
481
#### Usage Example
482
483
```python
484
from azure.ai.ml import Input, Output
485
486
# Define inputs and outputs
487
inputs = {
488
"training_data": Input(
489
type="uri_folder",
490
path="azureml://datastores/workspaceblobstore/paths/data/train/",
491
mode="ro_mount"
492
),
493
"learning_rate": 0.01,
494
"epochs": 100
495
}
496
497
outputs = {
498
"model": Output(
499
type="uri_folder",
500
path="azureml://datastores/workspaceblobstore/paths/models/my-model/",
501
mode="rw_mount"
502
),
503
"metrics": Output(
504
type="uri_file",
505
path="azureml://datastores/workspaceblobstore/paths/metrics/metrics.json"
506
)
507
}
508
```