Microsoft Azure Batch Client Library for Python providing comprehensive APIs for managing batch computing workloads in Azure cloud
91
Job management capabilities for creating, configuring, and controlling logical containers that organize and manage task execution within batch pools. Jobs define the execution environment, constraints, and lifecycle management for groups of related tasks.
Create, retrieve, update, and delete jobs with comprehensive configuration options.
def add(job, job_add_options=None, custom_headers=None, raw=False, **operation_config):
"""
Add a job to the specified account.
Args:
job: The job to add (JobSpecification)
job_add_options: Additional options for the operation
custom_headers: Custom headers to include in request
raw: Return raw response if True
Returns:
None
"""
def list(job_list_options=None, custom_headers=None, raw=False, **operation_config):
"""
List all jobs in the account.
Args:
job_list_options: Additional options for listing
Returns:
ItemPaged[CloudJob]: Paginated list of jobs
"""
def get(job_id, job_get_options=None, custom_headers=None, raw=False, **operation_config):
"""
Get information about the specified job.
Args:
job_id: ID of the job to retrieve
job_get_options: Additional options for the operation
Returns:
CloudJob: Job information
"""
def delete(job_id, job_delete_options=None, custom_headers=None, raw=False, **operation_config):
"""
Delete the specified job.
Args:
job_id: ID of the job to delete
job_delete_options: Additional options for deletion
Returns:
None
"""Update job properties and configuration after creation.
def patch(job_id, job_patch_parameter, job_patch_options=None, custom_headers=None, raw=False, **operation_config):
"""
Update properties of the specified job.
Args:
job_id: ID of the job to update
job_patch_parameter: Properties to update
job_patch_options: Additional options
Returns:
None
"""
def update(job_id, job_update_parameter, job_update_options=None, custom_headers=None, raw=False, **operation_config):
"""
Update the properties of the specified job.
Args:
job_id: ID of the job to update
job_update_parameter: Properties to update
Returns:
None
"""Control job execution state including enabling, disabling, and terminating jobs.
def enable(job_id, job_enable_options=None, custom_headers=None, raw=False, **operation_config):
"""
Enable the specified job, allowing new tasks to run.
Args:
job_id: ID of the job to enable
job_enable_options: Additional options
Returns:
None
"""
def disable(job_id, disable_job_parameter, job_disable_options=None, custom_headers=None, raw=False, **operation_config):
"""
Disable the specified job, preventing new tasks from running.
Args:
job_id: ID of the job to disable
disable_job_parameter: Disable parameters including what to do with active tasks
job_disable_options: Additional options
Returns:
None
"""
def terminate(job_id, job_terminate_parameter=None, job_terminate_options=None, custom_headers=None, raw=False, **operation_config):
"""
Terminate the specified job, marking it as completed.
Args:
job_id: ID of the job to terminate
job_terminate_parameter: Termination parameters including reason
job_terminate_options: Additional options
Returns:
None
"""from azure.batch.models import JobSpecification, PoolInformation
# Create job specification
job_spec = JobSpecification(
id="my-processing-job",
priority=100,
pool_info=PoolInformation(pool_id="my-pool"),
display_name="Data Processing Job"
)
# Add the job
client.job.add(job_spec)from azure.batch.models import (
JobSpecification, PoolInformation, JobConstraints,
EnvironmentSetting, JobManagerTask, UserIdentity,
AutoUserSpecification
)
# Define job constraints
job_constraints = JobConstraints(
max_wall_clock_time=datetime.timedelta(hours=24),
max_task_retry_count=3
)
# Environment settings for all tasks in the job
common_env_settings = [
EnvironmentSetting(name="DATA_PATH", value="/mnt/data"),
EnvironmentSetting(name="LOG_LEVEL", value="INFO")
]
# Job manager task to coordinate job execution
job_manager = JobManagerTask(
id="job-manager",
command_line="python job_manager.py",
display_name="Job Manager",
user_identity=UserIdentity(
auto_user=AutoUserSpecification(
scope="pool",
elevation_level="nonadmin"
)
),
kill_job_on_completion=True,
run_exclusive=False
)
job_spec = JobSpecification(
id="complex-job",
priority=200,
pool_info=PoolInformation(pool_id="my-pool"),
constraints=job_constraints,
common_environment_settings=common_env_settings,
job_manager_task=job_manager,
display_name="Complex Processing Job"
)
client.job.add(job_spec)from azure.batch.models import (
JobPreparationTask, JobReleaseTask, ResourceFile
)
# Job preparation task - runs once on each node before any job tasks
job_prep = JobPreparationTask(
id="job-prep",
command_line="mkdir -p /tmp/jobdata && download_data.sh",
resource_files=[
ResourceFile(
http_url="https://mystorageaccount.blob.core.windows.net/scripts/download_data.sh",
file_path="download_data.sh"
)
],
user_identity=UserIdentity(
auto_user=AutoUserSpecification(
scope="pool",
elevation_level="admin"
)
),
wait_for_success=True
)
# Job release task - runs after all tasks complete on each node
job_release = JobReleaseTask(
id="job-release",
command_line="cleanup.sh && rm -rf /tmp/jobdata",
resource_files=[
ResourceFile(
http_url="https://mystorageaccount.blob.core.windows.net/scripts/cleanup.sh",
file_path="cleanup.sh"
)
]
)
job_spec = JobSpecification(
id="job-with-prep-release",
pool_info=PoolInformation(pool_id="my-pool"),
job_preparation_task=job_prep,
job_release_task=job_release
)
client.job.add(job_spec)from azure.batch.models import DisableJobParameter
# List all jobs
jobs = client.job.list()
for job in jobs:
print(f"Job {job.id}: {job.state}")
# Get specific job details
job = client.job.get("my-processing-job")
print(f"Job priority: {job.priority}")
print(f"Creation time: {job.creation_time}")
# Disable job (stop new tasks, but let running tasks complete)
disable_params = DisableJobParameter(
disable_tasks="taskcompletion" # wait for running tasks to complete
)
client.job.disable("my-processing-job", disable_params)
# Re-enable job
client.job.enable("my-processing-job")
# Terminate job with reason
from azure.batch.models import JobTerminateParameter
terminate_params = JobTerminateParameter(
terminate_reason="Manual termination for maintenance"
)
client.job.terminate("my-processing-job", terminate_params)
# Delete completed job
client.job.delete("my-processing-job")from azure.batch.models import JobPatchParameter, JobConstraints
# Update job priority and constraints
patch_params = JobPatchParameter(
priority=50, # Lower priority
constraints=JobConstraints(
max_wall_clock_time=datetime.timedelta(hours=12),
max_task_retry_count=5
)
)
client.job.patch("my-processing-job", patch_params)class JobSpecification:
"""Job creation specification."""
def __init__(self):
self.id: str
self.display_name: str
self.priority: int # -1000 to 1000, higher is more priority
self.constraints: JobConstraints
self.job_manager_task: JobManagerTask
self.job_preparation_task: JobPreparationTask
self.job_release_task: JobReleaseTask
self.common_environment_settings: List[EnvironmentSetting]
self.pool_info: PoolInformation
self.on_all_tasks_complete: str # noaction, terminatejob
self.on_task_failure: str # noaction, performexitoptionsjobaction
self.metadata: List[MetadataItem]
self.uses_task_dependencies: bool
class JobConstraints:
"""Job execution constraints."""
def __init__(self):
self.max_wall_clock_time: datetime.timedelta
self.max_task_retry_count: int
class PoolInformation:
"""Pool information for job execution."""
def __init__(self):
self.pool_id: str
self.auto_pool_specification: AutoPoolSpecification
class JobManagerTask:
"""Job manager task specification."""
def __init__(self):
self.id: str
self.display_name: str
self.command_line: str
self.resource_files: List[ResourceFile]
self.environment_settings: List[EnvironmentSetting]
self.constraints: TaskConstraints
self.kill_job_on_completion: bool
self.user_identity: UserIdentity
self.run_exclusive: bool
self.application_package_references: List[ApplicationPackageReference]
self.authentication_token_settings: AuthenticationTokenSettings
class JobPreparationTask:
"""Job preparation task specification."""
def __init__(self):
self.id: str
self.command_line: str
self.resource_files: List[ResourceFile]
self.environment_settings: List[EnvironmentSetting]
self.constraints: TaskConstraints
self.wait_for_success: bool
self.user_identity: UserIdentity
self.rerun_on_node_reboot_after_success: bool
class JobReleaseTask:
"""Job release task specification."""
def __init__(self):
self.id: str
self.command_line: str
self.resource_files: List[ResourceFile]
self.environment_settings: List[EnvironmentSetting]
self.max_wall_clock_time: datetime.timedelta
self.retention_time: datetime.timedelta
self.user_identity: UserIdentityclass CloudJob:
"""Job information and state."""
def __init__(self):
self.id: str
self.display_name: str
self.uses_task_dependencies: bool
self.url: str
self.e_tag: str
self.last_modified: datetime.datetime
self.creation_time: datetime.datetime
self.state: str # active, disabling, disabled, enabling, terminating, completed, deleting
self.state_transition_time: datetime.datetime
self.previous_state: str
self.previous_state_transition_time: datetime.datetime
self.priority: int
self.constraints: JobConstraints
self.job_manager_task: JobManagerTask
self.job_preparation_task: JobPreparationTask
self.job_release_task: JobReleaseTask
self.common_environment_settings: List[EnvironmentSetting]
self.pool_info: PoolInformation
self.on_all_tasks_complete: str
self.on_task_failure: str
self.metadata: List[MetadataItem]
self.execution_info: JobExecutionInformation
self.stats: JobStatistics
class JobExecutionInformation:
"""Job execution information."""
def __init__(self):
self.start_time: datetime.datetime
self.end_time: datetime.datetime
self.pool_id: str
self.scheduling_error: JobSchedulingError
self.terminate_reason: str
class DisableJobParameter:
"""Parameters for disabling a job."""
def __init__(self):
self.disable_tasks: str # requeue, terminate, wait
class JobTerminateParameter:
"""Parameters for terminating a job."""
def __init__(self):
self.terminate_reason: str
class JobPatchParameter:
"""Parameters for patching job properties."""
def __init__(self):
self.priority: int
self.constraints: JobConstraints
self.pool_info: PoolInformation
self.metadata: List[MetadataItem]
self.on_all_tasks_complete: strInstall with Tessl CLI
npx tessl i tessl/pypi-azure-batchdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10