CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-batch

Microsoft Azure Batch Client Library for Python providing comprehensive APIs for managing batch computing workloads in Azure cloud

91

1.07x
Overview
Eval results
Files

task-operations.mddocs/

Task Operations

Task management capabilities for creating, monitoring, and controlling individual work items that execute within batch jobs on compute nodes. Tasks represent the actual computational work performed in Azure Batch and can include data processing, analysis, rendering, or any other computational workload.

Capabilities

Task Lifecycle Management

Create, retrieve, update, and delete tasks with comprehensive configuration options.

def add(job_id, task, task_add_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Add a task to the specified job.
    
    Args:
        job_id: ID of the job to add the task to
        task: The task to add (TaskSpecification)
        task_add_options: Additional options for the operation
        custom_headers: Custom headers to include in request
        raw: Return raw response if True
        
    Returns:
        None
    """

def add_collection(job_id, task_add_collection_parameter, task_add_collection_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Add multiple tasks to the specified job.
    
    Args:
        job_id: ID of the job to add tasks to
        task_add_collection_parameter: Collection of tasks to add
        task_add_collection_options: Additional options
        
    Returns:
        TaskAddCollectionResult: Results of adding tasks including any failures
    """

def list(job_id, task_list_options=None, custom_headers=None, raw=False, **operation_config):
    """
    List all tasks in the specified job.
    
    Args:
        job_id: ID of the job containing tasks
        task_list_options: Additional options for listing
        
    Returns:
        ItemPaged[CloudTask]: Paginated list of tasks
    """

def get(job_id, task_id, task_get_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Get information about the specified task.
    
    Args:
        job_id: ID of the job containing the task
        task_id: ID of the task to retrieve
        task_get_options: Additional options for the operation
        
    Returns:
        CloudTask: Task information
    """

def delete(job_id, task_id, task_delete_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Delete the specified task.
    
    Args:
        job_id: ID of the job containing the task
        task_id: ID of the task to delete
        task_delete_options: Additional options for deletion
        
    Returns:
        None
    """

Task Configuration Updates

Update task properties and configuration after creation.

def update(job_id, task_id, task_update_parameter, task_update_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Update properties of the specified task.
    
    Args:
        job_id: ID of the job containing the task
        task_id: ID of the task to update
        task_update_parameter: Properties to update
        task_update_options: Additional options
        
    Returns:
        None
    """

Task State Control

Control task execution state including termination and reactivation.

def terminate(job_id, task_id, task_terminate_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Terminate the specified task.
    
    Args:
        job_id: ID of the job containing the task
        task_id: ID of the task to terminate
        task_terminate_options: Additional options
        
    Returns:
        None
    """

def list_subtasks(job_id, task_id, task_list_subtasks_options=None, custom_headers=None, raw=False, **operation_config):
    """
    List subtasks of a multi-instance task.
    
    Args:
        job_id: ID of the job containing the task
        task_id: ID of the multi-instance task
        task_list_subtasks_options: Additional options
        
    Returns:
        CloudTaskListSubtasksResult: List of subtask information
    """

Usage Examples

Creating a Simple Task

from azure.batch.models import TaskSpecification

# Create simple task
task_spec = TaskSpecification(
    id="task-001",
    command_line="echo 'Hello from Azure Batch!'",
    display_name="Simple Echo Task"
)

# Add task to job
client.task.add("my-job", task_spec)

Creating a Task with Input/Output Files

from azure.batch.models import (
    TaskSpecification, ResourceFile, OutputFile, 
    OutputFileDestination, OutputFileBlobContainerDestination,
    OutputFileUploadOptions
)

# Input files for the task
resource_files = [
    ResourceFile(
        http_url="https://mystorageaccount.blob.core.windows.net/input/data.txt",
        file_path="input/data.txt"
    ),
    ResourceFile(
        http_url="https://mystorageaccount.blob.core.windows.net/scripts/process.py",
        file_path="process.py"
    )
]

# Output files from the task
output_files = [
    OutputFile(
        file_pattern="../std*.txt",  # stdout.txt, stderr.txt
        destination=OutputFileDestination(
            container_url="https://mystorageaccount.blob.core.windows.net/output?sas_token"
        ),
        upload_options=OutputFileUploadOptions(
            upload_condition="taskcompletion"
        )
    ),
    OutputFile(
        file_pattern="results/*",  # All files in results directory
        destination=OutputFileDestination(
            container_url="https://mystorageaccount.blob.core.windows.net/results?sas_token"
        ),
        upload_options=OutputFileUploadOptions(
            upload_condition="tasksuccess"
        )
    )
]

task_spec = TaskSpecification(
    id="data-processing-task",
    command_line="python process.py input/data.txt",
    resource_files=resource_files,
    output_files=output_files,
    display_name="Data Processing Task"
)

client.task.add("my-job", task_spec)

Creating a Task with Dependencies

from azure.batch.models import TaskSpecification, TaskDependencies

# Task that depends on other tasks
task_with_deps = TaskSpecification(
    id="analysis-task",
    command_line="python analyze.py",
    depends_on=TaskDependencies(
        task_ids=["preprocessing-task-1", "preprocessing-task-2"],
        task_id_ranges=[{"start": 10, "end": 20}]  # Tasks 10-20
    ),
    display_name="Analysis Task"
)

client.task.add("my-job", task_with_deps)

Creating a Multi-instance Task (MPI)

from azure.batch.models import (
    TaskSpecification, MultiInstanceSettings,
    ResourceFile, EnvironmentSetting
)

# Multi-instance task for MPI workloads
mpi_task = TaskSpecification(
    id="mpi-task",
    command_line="mpirun -np $AZ_BATCH_NODE_COUNT python mpi_app.py",
    multi_instance_settings=MultiInstanceSettings(
        number_of_instances=4,  # Run on 4 nodes
        coordination_command_line="echo 'Setting up MPI environment'",
        common_resource_files=[
            ResourceFile(
                http_url="https://mystorageaccount.blob.core.windows.net/mpi/mpi_app.py",
                file_path="mpi_app.py"
            )
        ]
    ),
    resource_files=[
        ResourceFile(
            http_url="https://mystorageaccount.blob.core.windows.net/mpi/hostfile",
            file_path="hostfile"
        )
    ],
    environment_settings=[
        EnvironmentSetting(name="MPI_HOSTS_FILE", value="hostfile")
    ],
    display_name="MPI Processing Task"
)

client.task.add("my-job", mpi_task)

Adding Multiple Tasks at Once

from azure.batch.models import TaskAddCollectionParameter

# Create multiple tasks
tasks = []
for i in range(10):
    task = TaskSpecification(
        id=f"parallel-task-{i:03d}",
        command_line=f"python worker.py --input file_{i:03d}.txt --output result_{i:03d}.txt",
        resource_files=[
            ResourceFile(
                http_url=f"https://mystorageaccount.blob.core.windows.net/input/file_{i:03d}.txt",
                file_path=f"file_{i:03d}.txt"
            )
        ]
    )
    tasks.append(task)

# Add all tasks at once
task_collection = TaskAddCollectionParameter(value=tasks)
result = client.task.add_collection("my-job", task_collection)

# Check for any failures
for failure in result.value:
    if failure.status == "clienterror" or failure.status == "servererror":
        print(f"Failed to add task {failure.task_id}: {failure.error.message}")

Managing Task State

# List tasks in a job
tasks = client.task.list("my-job")
for task in tasks:
    print(f"Task {task.id}: {task.state}")
    if task.execution_info:
        print(f"  Exit code: {task.execution_info.exit_code}")
        print(f"  Start time: {task.execution_info.start_time}")

# Get specific task details
task = client.task.get("my-job", "task-001")
print(f"Task command: {task.command_line}")
print(f"Task state: {task.state}")

# Terminate a running task
client.task.terminate("my-job", "long-running-task")

# Update task constraints
from azure.batch.models import TaskUpdateParameter, TaskConstraints
update_params = TaskUpdateParameter(
    constraints=TaskConstraints(
        max_wall_clock_time=datetime.timedelta(hours=2),
        max_task_retry_count=3
    )
)
client.task.update("my-job", "task-001", update_params)

# Delete completed task
client.task.delete("my-job", "task-001")

Types

Task Configuration Types

class TaskSpecification:
    """Task creation specification."""
    def __init__(self):
        self.id: str
        self.display_name: str
        self.command_line: str
        self.resource_files: List[ResourceFile]
        self.output_files: List[OutputFile]
        self.environment_settings: List[EnvironmentSetting]
        self.affinity_info: AffinityInformation
        self.constraints: TaskConstraints
        self.required_slots: int
        self.user_identity: UserIdentity
        self.exit_conditions: ExitConditions
        self.depends_on: TaskDependencies
        self.application_package_references: List[ApplicationPackageReference]
        self.authentication_token_settings: AuthenticationTokenSettings
        self.multi_instance_settings: MultiInstanceSettings
        self.container_settings: TaskContainerSettings

class TaskConstraints:
    """Task execution constraints."""
    def __init__(self):
        self.max_wall_clock_time: datetime.timedelta
        self.retention_time: datetime.timedelta
        self.max_task_retry_count: int

class ResourceFile:
    """Input file specification."""
    def __init__(self):
        self.auto_storage_container_name: str
        self.storage_container_url: str
        self.http_url: str
        self.blob_prefix: str
        self.file_path: str
        self.file_mode: str

class OutputFile:
    """Output file specification."""
    def __init__(self):
        self.file_pattern: str
        self.destination: OutputFileDestination
        self.upload_options: OutputFileUploadOptions

class TaskDependencies:
    """Task dependency specification."""
    def __init__(self):
        self.task_ids: List[str]
        self.task_id_ranges: List[TaskIdRange]

class MultiInstanceSettings:
    """Multi-instance task settings for MPI workloads."""
    def __init__(self):
        self.number_of_instances: int
        self.coordination_command_line: str
        self.common_resource_files: List[ResourceFile]

Task State Types

class CloudTask:
    """Task information and state."""
    def __init__(self):
        self.id: str
        self.display_name: str
        self.url: str
        self.e_tag: str
        self.last_modified: datetime.datetime
        self.creation_time: datetime.datetime
        self.state: str  # active, preparing, running, completed
        self.state_transition_time: datetime.datetime
        self.previous_state: str
        self.previous_state_transition_time: datetime.datetime
        self.command_line: str
        self.resource_files: List[ResourceFile]
        self.output_files: List[OutputFile]
        self.environment_settings: List[EnvironmentSetting]
        self.affinity_info: AffinityInformation
        self.constraints: TaskConstraints
        self.required_slots: int
        self.user_identity: UserIdentity
        self.execution_info: TaskExecutionInformation
        self.node_info: ComputeNodeInformation
        self.multi_instance_settings: MultiInstanceSettings
        self.stats: TaskStatistics
        self.depends_on: TaskDependencies

class TaskExecutionInformation:
    """Task execution information."""
    def __init__(self):
        self.start_time: datetime.datetime
        self.end_time: datetime.datetime
        self.exit_code: int
        self.container_info: TaskContainerExecutionInformation
        self.failure_info: TaskFailureInformation
        self.retry_count: int
        self.last_retry_time: datetime.datetime
        self.required_slots: int
        self.last_requeue_time: datetime.datetime

class TaskAddCollectionParameter:
    """Parameters for adding multiple tasks."""
    def __init__(self):
        self.value: List[TaskSpecification]

class TaskAddCollectionResult:
    """Results of adding task collection."""
    def __init__(self):
        self.value: List[TaskAddResult]

class TaskAddResult:
    """Result of adding individual task."""
    def __init__(self):
        self.status: str  # success, clienterror, servererror
        self.task_id: str
        self.e_tag: str
        self.last_modified: datetime.datetime
        self.location: str
        self.error: BatchError

Install with Tessl CLI

npx tessl i tessl/pypi-azure-batch

docs

account-operations.md

application-operations.md

certificate-operations.md

client-management.md

compute-node-extension-operations.md

compute-node-operations.md

file-operations.md

index.md

job-operations.md

job-schedule-operations.md

pool-operations.md

task-operations.md

tile.json