CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-batch

Microsoft Azure Batch Client Library for Python providing comprehensive APIs for managing batch computing workloads in Azure cloud

91

1.07x
Overview
Eval results
Files

job-operations.mddocs/

Job Operations

Job management capabilities for creating, configuring, and controlling logical containers that organize and manage task execution within batch pools. Jobs define the execution environment, constraints, and lifecycle management for groups of related tasks.

Capabilities

Job Lifecycle Management

Create, retrieve, update, and delete jobs with comprehensive configuration options.

def add(job, job_add_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Add a job to the specified account.
    
    Args:
        job: The job to add (JobSpecification)
        job_add_options: Additional options for the operation
        custom_headers: Custom headers to include in request
        raw: Return raw response if True
        
    Returns:
        None
    """

def list(job_list_options=None, custom_headers=None, raw=False, **operation_config):
    """
    List all jobs in the account.
    
    Args:
        job_list_options: Additional options for listing
        
    Returns:
        ItemPaged[CloudJob]: Paginated list of jobs
    """

def get(job_id, job_get_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Get information about the specified job.
    
    Args:
        job_id: ID of the job to retrieve
        job_get_options: Additional options for the operation
        
    Returns:
        CloudJob: Job information
    """

def delete(job_id, job_delete_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Delete the specified job.
    
    Args:
        job_id: ID of the job to delete
        job_delete_options: Additional options for deletion
        
    Returns:
        None
    """

Job Configuration Updates

Update job properties and configuration after creation.

def patch(job_id, job_patch_parameter, job_patch_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Update properties of the specified job.
    
    Args:
        job_id: ID of the job to update
        job_patch_parameter: Properties to update
        job_patch_options: Additional options
        
    Returns:
        None
    """

def update(job_id, job_update_parameter, job_update_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Update the properties of the specified job.
    
    Args:
        job_id: ID of the job to update
        job_update_parameter: Properties to update
        
    Returns:
        None
    """

Job State Control

Control job execution state including enabling, disabling, and terminating jobs.

def enable(job_id, job_enable_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Enable the specified job, allowing new tasks to run.
    
    Args:
        job_id: ID of the job to enable
        job_enable_options: Additional options
        
    Returns:
        None
    """

def disable(job_id, disable_job_parameter, job_disable_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Disable the specified job, preventing new tasks from running.
    
    Args:
        job_id: ID of the job to disable
        disable_job_parameter: Disable parameters including what to do with active tasks
        job_disable_options: Additional options
        
    Returns:
        None
    """

def terminate(job_id, job_terminate_parameter=None, job_terminate_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Terminate the specified job, marking it as completed.
    
    Args:
        job_id: ID of the job to terminate
        job_terminate_parameter: Termination parameters including reason
        job_terminate_options: Additional options
        
    Returns:
        None
    """

Usage Examples

Creating a Basic Job

from azure.batch.models import JobSpecification, PoolInformation

# Create job specification
job_spec = JobSpecification(
    id="my-processing-job",
    priority=100,
    pool_info=PoolInformation(pool_id="my-pool"),
    display_name="Data Processing Job"
)

# Add the job
client.job.add(job_spec)

Creating a Job with Constraints and Environment

from azure.batch.models import (
    JobSpecification, PoolInformation, JobConstraints,
    EnvironmentSetting, JobManagerTask, UserIdentity,
    AutoUserSpecification
)

# Define job constraints
job_constraints = JobConstraints(
    max_wall_clock_time=datetime.timedelta(hours=24),
    max_task_retry_count=3
)

# Environment settings for all tasks in the job
common_env_settings = [
    EnvironmentSetting(name="DATA_PATH", value="/mnt/data"),
    EnvironmentSetting(name="LOG_LEVEL", value="INFO")
]

# Job manager task to coordinate job execution
job_manager = JobManagerTask(
    id="job-manager",
    command_line="python job_manager.py",
    display_name="Job Manager",
    user_identity=UserIdentity(
        auto_user=AutoUserSpecification(
            scope="pool",
            elevation_level="nonadmin"
        )
    ),
    kill_job_on_completion=True,
    run_exclusive=False
)

job_spec = JobSpecification(
    id="complex-job",
    priority=200,
    pool_info=PoolInformation(pool_id="my-pool"),
    constraints=job_constraints,
    common_environment_settings=common_env_settings,
    job_manager_task=job_manager,
    display_name="Complex Processing Job"
)

client.job.add(job_spec)

Creating a Job with Preparation and Release Tasks

from azure.batch.models import (
    JobPreparationTask, JobReleaseTask, ResourceFile
)

# Job preparation task - runs once on each node before any job tasks
job_prep = JobPreparationTask(
    id="job-prep",
    command_line="mkdir -p /tmp/jobdata && download_data.sh",
    resource_files=[
        ResourceFile(
            http_url="https://mystorageaccount.blob.core.windows.net/scripts/download_data.sh",
            file_path="download_data.sh"
        )
    ],
    user_identity=UserIdentity(
        auto_user=AutoUserSpecification(
            scope="pool",
            elevation_level="admin"
        )
    ),
    wait_for_success=True
)

# Job release task - runs after all tasks complete on each node
job_release = JobReleaseTask(
    id="job-release",
    command_line="cleanup.sh && rm -rf /tmp/jobdata",
    resource_files=[
        ResourceFile(
            http_url="https://mystorageaccount.blob.core.windows.net/scripts/cleanup.sh", 
            file_path="cleanup.sh"
        )
    ]
)

job_spec = JobSpecification(
    id="job-with-prep-release",
    pool_info=PoolInformation(pool_id="my-pool"),
    job_preparation_task=job_prep,
    job_release_task=job_release
)

client.job.add(job_spec)

Managing Job State

from azure.batch.models import DisableJobParameter

# List all jobs
jobs = client.job.list()
for job in jobs:
    print(f"Job {job.id}: {job.state}")

# Get specific job details
job = client.job.get("my-processing-job")
print(f"Job priority: {job.priority}")
print(f"Creation time: {job.creation_time}")

# Disable job (stop new tasks, but let running tasks complete)
disable_params = DisableJobParameter(
    disable_tasks="taskcompletion"  # wait for running tasks to complete
)
client.job.disable("my-processing-job", disable_params)

# Re-enable job
client.job.enable("my-processing-job")

# Terminate job with reason
from azure.batch.models import JobTerminateParameter
terminate_params = JobTerminateParameter(
    terminate_reason="Manual termination for maintenance"
)
client.job.terminate("my-processing-job", terminate_params)

# Delete completed job
client.job.delete("my-processing-job")

Updating Job Properties

from azure.batch.models import JobPatchParameter, JobConstraints

# Update job priority and constraints
patch_params = JobPatchParameter(
    priority=50,  # Lower priority
    constraints=JobConstraints(
        max_wall_clock_time=datetime.timedelta(hours=12),
        max_task_retry_count=5
    )
)

client.job.patch("my-processing-job", patch_params)

Types

Job Configuration Types

class JobSpecification:
    """Job creation specification."""
    def __init__(self):
        self.id: str
        self.display_name: str
        self.priority: int  # -1000 to 1000, higher is more priority
        self.constraints: JobConstraints
        self.job_manager_task: JobManagerTask
        self.job_preparation_task: JobPreparationTask
        self.job_release_task: JobReleaseTask
        self.common_environment_settings: List[EnvironmentSetting]
        self.pool_info: PoolInformation
        self.on_all_tasks_complete: str  # noaction, terminatejob
        self.on_task_failure: str  # noaction, performexitoptionsjobaction
        self.metadata: List[MetadataItem]
        self.uses_task_dependencies: bool

class JobConstraints:
    """Job execution constraints."""
    def __init__(self):
        self.max_wall_clock_time: datetime.timedelta
        self.max_task_retry_count: int

class PoolInformation:
    """Pool information for job execution."""
    def __init__(self):
        self.pool_id: str
        self.auto_pool_specification: AutoPoolSpecification

class JobManagerTask:
    """Job manager task specification."""
    def __init__(self):
        self.id: str
        self.display_name: str
        self.command_line: str
        self.resource_files: List[ResourceFile]
        self.environment_settings: List[EnvironmentSetting]
        self.constraints: TaskConstraints
        self.kill_job_on_completion: bool
        self.user_identity: UserIdentity
        self.run_exclusive: bool
        self.application_package_references: List[ApplicationPackageReference]
        self.authentication_token_settings: AuthenticationTokenSettings

class JobPreparationTask:
    """Job preparation task specification."""
    def __init__(self):
        self.id: str
        self.command_line: str
        self.resource_files: List[ResourceFile]
        self.environment_settings: List[EnvironmentSetting]
        self.constraints: TaskConstraints
        self.wait_for_success: bool
        self.user_identity: UserIdentity
        self.rerun_on_node_reboot_after_success: bool

class JobReleaseTask:
    """Job release task specification.""" 
    def __init__(self):
        self.id: str
        self.command_line: str
        self.resource_files: List[ResourceFile]
        self.environment_settings: List[EnvironmentSetting]
        self.max_wall_clock_time: datetime.timedelta
        self.retention_time: datetime.timedelta
        self.user_identity: UserIdentity

Job State Types

class CloudJob:
    """Job information and state."""
    def __init__(self):
        self.id: str
        self.display_name: str
        self.uses_task_dependencies: bool
        self.url: str
        self.e_tag: str
        self.last_modified: datetime.datetime
        self.creation_time: datetime.datetime
        self.state: str  # active, disabling, disabled, enabling, terminating, completed, deleting
        self.state_transition_time: datetime.datetime
        self.previous_state: str
        self.previous_state_transition_time: datetime.datetime
        self.priority: int
        self.constraints: JobConstraints
        self.job_manager_task: JobManagerTask
        self.job_preparation_task: JobPreparationTask
        self.job_release_task: JobReleaseTask
        self.common_environment_settings: List[EnvironmentSetting]
        self.pool_info: PoolInformation
        self.on_all_tasks_complete: str
        self.on_task_failure: str
        self.metadata: List[MetadataItem]
        self.execution_info: JobExecutionInformation
        self.stats: JobStatistics

class JobExecutionInformation:
    """Job execution information."""
    def __init__(self):
        self.start_time: datetime.datetime
        self.end_time: datetime.datetime
        self.pool_id: str
        self.scheduling_error: JobSchedulingError
        self.terminate_reason: str

class DisableJobParameter:
    """Parameters for disabling a job."""
    def __init__(self):
        self.disable_tasks: str  # requeue, terminate, wait

class JobTerminateParameter:
    """Parameters for terminating a job."""
    def __init__(self):
        self.terminate_reason: str

class JobPatchParameter:
    """Parameters for patching job properties."""
    def __init__(self):
        self.priority: int
        self.constraints: JobConstraints
        self.pool_info: PoolInformation
        self.metadata: List[MetadataItem]
        self.on_all_tasks_complete: str

Install with Tessl CLI

npx tessl i tessl/pypi-azure-batch

docs

account-operations.md

application-operations.md

certificate-operations.md

client-management.md

compute-node-extension-operations.md

compute-node-operations.md

file-operations.md

index.md

job-operations.md

job-schedule-operations.md

pool-operations.md

task-operations.md

tile.json