Microsoft Azure Batch Client Library for Python providing comprehensive APIs for managing batch computing workloads in Azure cloud
91
Task management capabilities for creating, monitoring, and controlling individual work items that execute within batch jobs on compute nodes. Tasks represent the actual computational work performed in Azure Batch and can include data processing, analysis, rendering, or any other computational workload.
Create, retrieve, update, and delete tasks with comprehensive configuration options.
def add(job_id, task, task_add_options=None, custom_headers=None, raw=False, **operation_config):
"""
Add a task to the specified job.
Args:
job_id: ID of the job to add the task to
task: The task to add (TaskSpecification)
task_add_options: Additional options for the operation
custom_headers: Custom headers to include in request
raw: Return raw response if True
Returns:
None
"""
def add_collection(job_id, task_add_collection_parameter, task_add_collection_options=None, custom_headers=None, raw=False, **operation_config):
"""
Add multiple tasks to the specified job.
Args:
job_id: ID of the job to add tasks to
task_add_collection_parameter: Collection of tasks to add
task_add_collection_options: Additional options
Returns:
TaskAddCollectionResult: Results of adding tasks including any failures
"""
def list(job_id, task_list_options=None, custom_headers=None, raw=False, **operation_config):
"""
List all tasks in the specified job.
Args:
job_id: ID of the job containing tasks
task_list_options: Additional options for listing
Returns:
ItemPaged[CloudTask]: Paginated list of tasks
"""
def get(job_id, task_id, task_get_options=None, custom_headers=None, raw=False, **operation_config):
"""
Get information about the specified task.
Args:
job_id: ID of the job containing the task
task_id: ID of the task to retrieve
task_get_options: Additional options for the operation
Returns:
CloudTask: Task information
"""
def delete(job_id, task_id, task_delete_options=None, custom_headers=None, raw=False, **operation_config):
"""
Delete the specified task.
Args:
job_id: ID of the job containing the task
task_id: ID of the task to delete
task_delete_options: Additional options for deletion
Returns:
None
"""Update task properties and configuration after creation.
def update(job_id, task_id, task_update_parameter, task_update_options=None, custom_headers=None, raw=False, **operation_config):
"""
Update properties of the specified task.
Args:
job_id: ID of the job containing the task
task_id: ID of the task to update
task_update_parameter: Properties to update
task_update_options: Additional options
Returns:
None
"""Control task execution state including termination and reactivation.
def terminate(job_id, task_id, task_terminate_options=None, custom_headers=None, raw=False, **operation_config):
"""
Terminate the specified task.
Args:
job_id: ID of the job containing the task
task_id: ID of the task to terminate
task_terminate_options: Additional options
Returns:
None
"""
def list_subtasks(job_id, task_id, task_list_subtasks_options=None, custom_headers=None, raw=False, **operation_config):
"""
List subtasks of a multi-instance task.
Args:
job_id: ID of the job containing the task
task_id: ID of the multi-instance task
task_list_subtasks_options: Additional options
Returns:
CloudTaskListSubtasksResult: List of subtask information
"""from azure.batch.models import TaskSpecification
# Create simple task
task_spec = TaskSpecification(
id="task-001",
command_line="echo 'Hello from Azure Batch!'",
display_name="Simple Echo Task"
)
# Add task to job
client.task.add("my-job", task_spec)from azure.batch.models import (
TaskSpecification, ResourceFile, OutputFile,
OutputFileDestination, OutputFileBlobContainerDestination,
OutputFileUploadOptions
)
# Input files for the task
resource_files = [
ResourceFile(
http_url="https://mystorageaccount.blob.core.windows.net/input/data.txt",
file_path="input/data.txt"
),
ResourceFile(
http_url="https://mystorageaccount.blob.core.windows.net/scripts/process.py",
file_path="process.py"
)
]
# Output files from the task
output_files = [
OutputFile(
file_pattern="../std*.txt", # stdout.txt, stderr.txt
destination=OutputFileDestination(
container_url="https://mystorageaccount.blob.core.windows.net/output?sas_token"
),
upload_options=OutputFileUploadOptions(
upload_condition="taskcompletion"
)
),
OutputFile(
file_pattern="results/*", # All files in results directory
destination=OutputFileDestination(
container_url="https://mystorageaccount.blob.core.windows.net/results?sas_token"
),
upload_options=OutputFileUploadOptions(
upload_condition="tasksuccess"
)
)
]
task_spec = TaskSpecification(
id="data-processing-task",
command_line="python process.py input/data.txt",
resource_files=resource_files,
output_files=output_files,
display_name="Data Processing Task"
)
client.task.add("my-job", task_spec)from azure.batch.models import TaskSpecification, TaskDependencies
# Task that depends on other tasks
task_with_deps = TaskSpecification(
id="analysis-task",
command_line="python analyze.py",
depends_on=TaskDependencies(
task_ids=["preprocessing-task-1", "preprocessing-task-2"],
task_id_ranges=[{"start": 10, "end": 20}] # Tasks 10-20
),
display_name="Analysis Task"
)
client.task.add("my-job", task_with_deps)from azure.batch.models import (
TaskSpecification, MultiInstanceSettings,
ResourceFile, EnvironmentSetting
)
# Multi-instance task for MPI workloads
mpi_task = TaskSpecification(
id="mpi-task",
command_line="mpirun -np $AZ_BATCH_NODE_COUNT python mpi_app.py",
multi_instance_settings=MultiInstanceSettings(
number_of_instances=4, # Run on 4 nodes
coordination_command_line="echo 'Setting up MPI environment'",
common_resource_files=[
ResourceFile(
http_url="https://mystorageaccount.blob.core.windows.net/mpi/mpi_app.py",
file_path="mpi_app.py"
)
]
),
resource_files=[
ResourceFile(
http_url="https://mystorageaccount.blob.core.windows.net/mpi/hostfile",
file_path="hostfile"
)
],
environment_settings=[
EnvironmentSetting(name="MPI_HOSTS_FILE", value="hostfile")
],
display_name="MPI Processing Task"
)
client.task.add("my-job", mpi_task)from azure.batch.models import TaskAddCollectionParameter
# Create multiple tasks
tasks = []
for i in range(10):
task = TaskSpecification(
id=f"parallel-task-{i:03d}",
command_line=f"python worker.py --input file_{i:03d}.txt --output result_{i:03d}.txt",
resource_files=[
ResourceFile(
http_url=f"https://mystorageaccount.blob.core.windows.net/input/file_{i:03d}.txt",
file_path=f"file_{i:03d}.txt"
)
]
)
tasks.append(task)
# Add all tasks at once
task_collection = TaskAddCollectionParameter(value=tasks)
result = client.task.add_collection("my-job", task_collection)
# Check for any failures
for failure in result.value:
if failure.status == "clienterror" or failure.status == "servererror":
print(f"Failed to add task {failure.task_id}: {failure.error.message}")# List tasks in a job
tasks = client.task.list("my-job")
for task in tasks:
print(f"Task {task.id}: {task.state}")
if task.execution_info:
print(f" Exit code: {task.execution_info.exit_code}")
print(f" Start time: {task.execution_info.start_time}")
# Get specific task details
task = client.task.get("my-job", "task-001")
print(f"Task command: {task.command_line}")
print(f"Task state: {task.state}")
# Terminate a running task
client.task.terminate("my-job", "long-running-task")
# Update task constraints
from azure.batch.models import TaskUpdateParameter, TaskConstraints
update_params = TaskUpdateParameter(
constraints=TaskConstraints(
max_wall_clock_time=datetime.timedelta(hours=2),
max_task_retry_count=3
)
)
client.task.update("my-job", "task-001", update_params)
# Delete completed task
client.task.delete("my-job", "task-001")class TaskSpecification:
"""Task creation specification."""
def __init__(self):
self.id: str
self.display_name: str
self.command_line: str
self.resource_files: List[ResourceFile]
self.output_files: List[OutputFile]
self.environment_settings: List[EnvironmentSetting]
self.affinity_info: AffinityInformation
self.constraints: TaskConstraints
self.required_slots: int
self.user_identity: UserIdentity
self.exit_conditions: ExitConditions
self.depends_on: TaskDependencies
self.application_package_references: List[ApplicationPackageReference]
self.authentication_token_settings: AuthenticationTokenSettings
self.multi_instance_settings: MultiInstanceSettings
self.container_settings: TaskContainerSettings
class TaskConstraints:
"""Task execution constraints."""
def __init__(self):
self.max_wall_clock_time: datetime.timedelta
self.retention_time: datetime.timedelta
self.max_task_retry_count: int
class ResourceFile:
"""Input file specification."""
def __init__(self):
self.auto_storage_container_name: str
self.storage_container_url: str
self.http_url: str
self.blob_prefix: str
self.file_path: str
self.file_mode: str
class OutputFile:
"""Output file specification."""
def __init__(self):
self.file_pattern: str
self.destination: OutputFileDestination
self.upload_options: OutputFileUploadOptions
class TaskDependencies:
"""Task dependency specification."""
def __init__(self):
self.task_ids: List[str]
self.task_id_ranges: List[TaskIdRange]
class MultiInstanceSettings:
"""Multi-instance task settings for MPI workloads."""
def __init__(self):
self.number_of_instances: int
self.coordination_command_line: str
self.common_resource_files: List[ResourceFile]class CloudTask:
"""Task information and state."""
def __init__(self):
self.id: str
self.display_name: str
self.url: str
self.e_tag: str
self.last_modified: datetime.datetime
self.creation_time: datetime.datetime
self.state: str # active, preparing, running, completed
self.state_transition_time: datetime.datetime
self.previous_state: str
self.previous_state_transition_time: datetime.datetime
self.command_line: str
self.resource_files: List[ResourceFile]
self.output_files: List[OutputFile]
self.environment_settings: List[EnvironmentSetting]
self.affinity_info: AffinityInformation
self.constraints: TaskConstraints
self.required_slots: int
self.user_identity: UserIdentity
self.execution_info: TaskExecutionInformation
self.node_info: ComputeNodeInformation
self.multi_instance_settings: MultiInstanceSettings
self.stats: TaskStatistics
self.depends_on: TaskDependencies
class TaskExecutionInformation:
"""Task execution information."""
def __init__(self):
self.start_time: datetime.datetime
self.end_time: datetime.datetime
self.exit_code: int
self.container_info: TaskContainerExecutionInformation
self.failure_info: TaskFailureInformation
self.retry_count: int
self.last_retry_time: datetime.datetime
self.required_slots: int
self.last_requeue_time: datetime.datetime
class TaskAddCollectionParameter:
"""Parameters for adding multiple tasks."""
def __init__(self):
self.value: List[TaskSpecification]
class TaskAddCollectionResult:
"""Results of adding task collection."""
def __init__(self):
self.value: List[TaskAddResult]
class TaskAddResult:
"""Result of adding individual task."""
def __init__(self):
self.status: str # success, clienterror, servererror
self.task_id: str
self.e_tag: str
self.last_modified: datetime.datetime
self.location: str
self.error: BatchErrorInstall with Tessl CLI
npx tessl i tessl/pypi-azure-batchdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10