0
# Task Operations
1
2
Task management capabilities for creating, monitoring, and controlling individual work items that execute within batch jobs on compute nodes. Tasks represent the actual computational work performed in Azure Batch and can include data processing, analysis, rendering, or any other computational workload.
3
4
## Capabilities
5
6
### Task Lifecycle Management
7
8
Create, retrieve, update, and delete tasks with comprehensive configuration options.
9
10
```python { .api }
11
def add(job_id, task, task_add_options=None, custom_headers=None, raw=False, **operation_config):
12
"""
13
Add a task to the specified job.
14
15
Args:
16
job_id: ID of the job to add the task to
17
task: The task to add (TaskSpecification)
18
task_add_options: Additional options for the operation
19
custom_headers: Custom headers to include in request
20
raw: Return raw response if True
21
22
Returns:
23
None
24
"""
25
26
def add_collection(job_id, task_add_collection_parameter, task_add_collection_options=None, custom_headers=None, raw=False, **operation_config):
27
"""
28
Add multiple tasks to the specified job.
29
30
Args:
31
job_id: ID of the job to add tasks to
32
task_add_collection_parameter: Collection of tasks to add
33
task_add_collection_options: Additional options
34
35
Returns:
36
TaskAddCollectionResult: Results of adding tasks including any failures
37
"""
38
39
def list(job_id, task_list_options=None, custom_headers=None, raw=False, **operation_config):
40
"""
41
List all tasks in the specified job.
42
43
Args:
44
job_id: ID of the job containing tasks
45
task_list_options: Additional options for listing
46
47
Returns:
48
ItemPaged[CloudTask]: Paginated list of tasks
49
"""
50
51
def get(job_id, task_id, task_get_options=None, custom_headers=None, raw=False, **operation_config):
52
"""
53
Get information about the specified task.
54
55
Args:
56
job_id: ID of the job containing the task
57
task_id: ID of the task to retrieve
58
task_get_options: Additional options for the operation
59
60
Returns:
61
CloudTask: Task information
62
"""
63
64
def delete(job_id, task_id, task_delete_options=None, custom_headers=None, raw=False, **operation_config):
65
"""
66
Delete the specified task.
67
68
Args:
69
job_id: ID of the job containing the task
70
task_id: ID of the task to delete
71
task_delete_options: Additional options for deletion
72
73
Returns:
74
None
75
"""
76
```
77
78
### Task Configuration Updates
79
80
Update task properties and configuration after creation.
81
82
```python { .api }
83
def update(job_id, task_id, task_update_parameter, task_update_options=None, custom_headers=None, raw=False, **operation_config):
84
"""
85
Update properties of the specified task.
86
87
Args:
88
job_id: ID of the job containing the task
89
task_id: ID of the task to update
90
task_update_parameter: Properties to update
91
task_update_options: Additional options
92
93
Returns:
94
None
95
"""
96
```
97
98
### Task State Control
99
100
Control task execution state including termination and reactivation.
101
102
```python { .api }
103
def terminate(job_id, task_id, task_terminate_options=None, custom_headers=None, raw=False, **operation_config):
104
"""
105
Terminate the specified task.
106
107
Args:
108
job_id: ID of the job containing the task
109
task_id: ID of the task to terminate
110
task_terminate_options: Additional options
111
112
Returns:
113
None
114
"""
115
116
def list_subtasks(job_id, task_id, task_list_subtasks_options=None, custom_headers=None, raw=False, **operation_config):
117
"""
118
List subtasks of a multi-instance task.
119
120
Args:
121
job_id: ID of the job containing the task
122
task_id: ID of the multi-instance task
123
task_list_subtasks_options: Additional options
124
125
Returns:
126
CloudTaskListSubtasksResult: List of subtask information
127
"""
128
```
129
130
## Usage Examples
131
132
### Creating a Simple Task
133
134
```python
135
from azure.batch.models import TaskSpecification
136
137
# Create simple task
138
task_spec = TaskSpecification(
139
id="task-001",
140
command_line="echo 'Hello from Azure Batch!'",
141
display_name="Simple Echo Task"
142
)
143
144
# Add task to job
145
client.task.add("my-job", task_spec)
146
```
147
148
### Creating a Task with Input/Output Files
149
150
```python
151
from azure.batch.models import (
152
TaskSpecification, ResourceFile, OutputFile,
153
OutputFileDestination, OutputFileBlobContainerDestination,
154
OutputFileUploadOptions
155
)
156
157
# Input files for the task
158
resource_files = [
159
ResourceFile(
160
http_url="https://mystorageaccount.blob.core.windows.net/input/data.txt",
161
file_path="input/data.txt"
162
),
163
ResourceFile(
164
http_url="https://mystorageaccount.blob.core.windows.net/scripts/process.py",
165
file_path="process.py"
166
)
167
]
168
169
# Output files from the task
170
output_files = [
171
OutputFile(
172
file_pattern="../std*.txt", # stdout.txt, stderr.txt
173
destination=OutputFileDestination(
174
container_url="https://mystorageaccount.blob.core.windows.net/output?sas_token"
175
),
176
upload_options=OutputFileUploadOptions(
177
upload_condition="taskcompletion"
178
)
179
),
180
OutputFile(
181
file_pattern="results/*", # All files in results directory
182
destination=OutputFileDestination(
183
container_url="https://mystorageaccount.blob.core.windows.net/results?sas_token"
184
),
185
upload_options=OutputFileUploadOptions(
186
upload_condition="tasksuccess"
187
)
188
)
189
]
190
191
task_spec = TaskSpecification(
192
id="data-processing-task",
193
command_line="python process.py input/data.txt",
194
resource_files=resource_files,
195
output_files=output_files,
196
display_name="Data Processing Task"
197
)
198
199
client.task.add("my-job", task_spec)
200
```
201
202
### Creating a Task with Dependencies
203
204
```python
205
from azure.batch.models import TaskSpecification, TaskDependencies
206
207
# Task that depends on other tasks
208
task_with_deps = TaskSpecification(
209
id="analysis-task",
210
command_line="python analyze.py",
211
depends_on=TaskDependencies(
212
task_ids=["preprocessing-task-1", "preprocessing-task-2"],
213
task_id_ranges=[{"start": 10, "end": 20}] # Tasks 10-20
214
),
215
display_name="Analysis Task"
216
)
217
218
client.task.add("my-job", task_with_deps)
219
```
220
221
### Creating a Multi-instance Task (MPI)
222
223
```python
224
from azure.batch.models import (
225
TaskSpecification, MultiInstanceSettings,
226
ResourceFile, EnvironmentSetting
227
)
228
229
# Multi-instance task for MPI workloads
230
mpi_task = TaskSpecification(
231
id="mpi-task",
232
command_line="mpirun -np $AZ_BATCH_NODE_COUNT python mpi_app.py",
233
multi_instance_settings=MultiInstanceSettings(
234
number_of_instances=4, # Run on 4 nodes
235
coordination_command_line="echo 'Setting up MPI environment'",
236
common_resource_files=[
237
ResourceFile(
238
http_url="https://mystorageaccount.blob.core.windows.net/mpi/mpi_app.py",
239
file_path="mpi_app.py"
240
)
241
]
242
),
243
resource_files=[
244
ResourceFile(
245
http_url="https://mystorageaccount.blob.core.windows.net/mpi/hostfile",
246
file_path="hostfile"
247
)
248
],
249
environment_settings=[
250
EnvironmentSetting(name="MPI_HOSTS_FILE", value="hostfile")
251
],
252
display_name="MPI Processing Task"
253
)
254
255
client.task.add("my-job", mpi_task)
256
```
257
258
### Adding Multiple Tasks at Once
259
260
```python
261
from azure.batch.models import TaskAddCollectionParameter
262
263
# Create multiple tasks
264
tasks = []
265
for i in range(10):
266
task = TaskSpecification(
267
id=f"parallel-task-{i:03d}",
268
command_line=f"python worker.py --input file_{i:03d}.txt --output result_{i:03d}.txt",
269
resource_files=[
270
ResourceFile(
271
http_url=f"https://mystorageaccount.blob.core.windows.net/input/file_{i:03d}.txt",
272
file_path=f"file_{i:03d}.txt"
273
)
274
]
275
)
276
tasks.append(task)
277
278
# Add all tasks at once
279
task_collection = TaskAddCollectionParameter(value=tasks)
280
result = client.task.add_collection("my-job", task_collection)
281
282
# Check for any failures
283
for failure in result.value:
284
if failure.status == "clienterror" or failure.status == "servererror":
285
print(f"Failed to add task {failure.task_id}: {failure.error.message}")
286
```
287
288
### Managing Task State
289
290
```python
291
# List tasks in a job
292
tasks = client.task.list("my-job")
293
for task in tasks:
294
print(f"Task {task.id}: {task.state}")
295
if task.execution_info:
296
print(f" Exit code: {task.execution_info.exit_code}")
297
print(f" Start time: {task.execution_info.start_time}")
298
299
# Get specific task details
300
task = client.task.get("my-job", "task-001")
301
print(f"Task command: {task.command_line}")
302
print(f"Task state: {task.state}")
303
304
# Terminate a running task
305
client.task.terminate("my-job", "long-running-task")
306
307
# Update task constraints
308
from azure.batch.models import TaskUpdateParameter, TaskConstraints
309
update_params = TaskUpdateParameter(
310
constraints=TaskConstraints(
311
max_wall_clock_time=datetime.timedelta(hours=2),
312
max_task_retry_count=3
313
)
314
)
315
client.task.update("my-job", "task-001", update_params)
316
317
# Delete completed task
318
client.task.delete("my-job", "task-001")
319
```
320
321
## Types
322
323
### Task Configuration Types
324
325
```python { .api }
326
class TaskSpecification:
327
"""Task creation specification."""
328
def __init__(self):
329
self.id: str
330
self.display_name: str
331
self.command_line: str
332
self.resource_files: List[ResourceFile]
333
self.output_files: List[OutputFile]
334
self.environment_settings: List[EnvironmentSetting]
335
self.affinity_info: AffinityInformation
336
self.constraints: TaskConstraints
337
self.required_slots: int
338
self.user_identity: UserIdentity
339
self.exit_conditions: ExitConditions
340
self.depends_on: TaskDependencies
341
self.application_package_references: List[ApplicationPackageReference]
342
self.authentication_token_settings: AuthenticationTokenSettings
343
self.multi_instance_settings: MultiInstanceSettings
344
self.container_settings: TaskContainerSettings
345
346
class TaskConstraints:
347
"""Task execution constraints."""
348
def __init__(self):
349
self.max_wall_clock_time: datetime.timedelta
350
self.retention_time: datetime.timedelta
351
self.max_task_retry_count: int
352
353
class ResourceFile:
354
"""Input file specification."""
355
def __init__(self):
356
self.auto_storage_container_name: str
357
self.storage_container_url: str
358
self.http_url: str
359
self.blob_prefix: str
360
self.file_path: str
361
self.file_mode: str
362
363
class OutputFile:
364
"""Output file specification."""
365
def __init__(self):
366
self.file_pattern: str
367
self.destination: OutputFileDestination
368
self.upload_options: OutputFileUploadOptions
369
370
class TaskDependencies:
371
"""Task dependency specification."""
372
def __init__(self):
373
self.task_ids: List[str]
374
self.task_id_ranges: List[TaskIdRange]
375
376
class MultiInstanceSettings:
377
"""Multi-instance task settings for MPI workloads."""
378
def __init__(self):
379
self.number_of_instances: int
380
self.coordination_command_line: str
381
self.common_resource_files: List[ResourceFile]
382
```
383
384
### Task State Types
385
386
```python { .api }
387
class CloudTask:
388
"""Task information and state."""
389
def __init__(self):
390
self.id: str
391
self.display_name: str
392
self.url: str
393
self.e_tag: str
394
self.last_modified: datetime.datetime
395
self.creation_time: datetime.datetime
396
self.state: str # active, preparing, running, completed
397
self.state_transition_time: datetime.datetime
398
self.previous_state: str
399
self.previous_state_transition_time: datetime.datetime
400
self.command_line: str
401
self.resource_files: List[ResourceFile]
402
self.output_files: List[OutputFile]
403
self.environment_settings: List[EnvironmentSetting]
404
self.affinity_info: AffinityInformation
405
self.constraints: TaskConstraints
406
self.required_slots: int
407
self.user_identity: UserIdentity
408
self.execution_info: TaskExecutionInformation
409
self.node_info: ComputeNodeInformation
410
self.multi_instance_settings: MultiInstanceSettings
411
self.stats: TaskStatistics
412
self.depends_on: TaskDependencies
413
414
class TaskExecutionInformation:
415
"""Task execution information."""
416
def __init__(self):
417
self.start_time: datetime.datetime
418
self.end_time: datetime.datetime
419
self.exit_code: int
420
self.container_info: TaskContainerExecutionInformation
421
self.failure_info: TaskFailureInformation
422
self.retry_count: int
423
self.last_retry_time: datetime.datetime
424
self.required_slots: int
425
self.last_requeue_time: datetime.datetime
426
427
class TaskAddCollectionParameter:
428
"""Parameters for adding multiple tasks."""
429
def __init__(self):
430
self.value: List[TaskSpecification]
431
432
class TaskAddCollectionResult:
433
"""Results of adding task collection."""
434
def __init__(self):
435
self.value: List[TaskAddResult]
436
437
class TaskAddResult:
438
"""Result of adding individual task."""
439
def __init__(self):
440
self.status: str # success, clienterror, servererror
441
self.task_id: str
442
self.e_tag: str
443
self.last_modified: datetime.datetime
444
self.location: str
445
self.error: BatchError
446
```