Microsoft Azure Batch Client Library for Python providing comprehensive APIs for managing batch computing workloads in Azure cloud
91
Job schedule management capabilities for creating and managing recurring batch jobs with time-based or interval-based scheduling. Job schedules automatically create jobs based on predefined schedules and job specifications.
Create, retrieve, update, and delete job schedules with comprehensive configuration options.
def add(cloud_job_schedule, job_schedule_add_options=None, custom_headers=None, raw=False, **operation_config):
"""
Add a job schedule to the specified account.
Args:
cloud_job_schedule: The job schedule to add (JobScheduleAddParameter)
job_schedule_add_options: Additional options for the operation
custom_headers: Custom headers to include in request
raw: Return raw response if True
Returns:
None
"""
def list(job_schedule_list_options=None, custom_headers=None, raw=False, **operation_config):
"""
List all job schedules in the account.
Args:
job_schedule_list_options: Additional options for listing
Returns:
ItemPaged[CloudJobSchedule]: Paginated list of job schedules
"""
def get(job_schedule_id, job_schedule_get_options=None, custom_headers=None, raw=False, **operation_config):
"""
Get information about the specified job schedule.
Args:
job_schedule_id: ID of the job schedule to retrieve
job_schedule_get_options: Additional options for the operation
Returns:
CloudJobSchedule: Job schedule information
"""
def delete(job_schedule_id, job_schedule_delete_options=None, custom_headers=None, raw=False, **operation_config):
"""
Delete the specified job schedule.
Args:
job_schedule_id: ID of the job schedule to delete
job_schedule_delete_options: Additional options for deletion
Returns:
None
"""
def exists(job_schedule_id, job_schedule_exists_options=None, custom_headers=None, raw=False, **operation_config):
"""
Check if a job schedule exists.
Args:
job_schedule_id: ID of the job schedule to check
Returns:
bool: True if job schedule exists, False otherwise
"""Update job schedule properties and configuration after creation.
def patch(job_schedule_id, job_schedule_patch_parameter, job_schedule_patch_options=None, custom_headers=None, raw=False, **operation_config):
"""
Update properties of the specified job schedule.
Args:
job_schedule_id: ID of the job schedule to update
job_schedule_patch_parameter: Properties to update
job_schedule_patch_options: Additional options
Returns:
None
"""
def update(job_schedule_id, job_schedule_update_parameter, job_schedule_update_options=None, custom_headers=None, raw=False, **operation_config):
"""
Update the properties of the specified job schedule.
Args:
job_schedule_id: ID of the job schedule to update
job_schedule_update_parameter: Properties to update
Returns:
None
"""Control job schedule execution state including enabling, disabling, and terminating schedules.
def enable(job_schedule_id, job_schedule_enable_options=None, custom_headers=None, raw=False, **operation_config):
"""
Enable the specified job schedule.
Args:
job_schedule_id: ID of the job schedule to enable
job_schedule_enable_options: Additional options
Returns:
None
"""
def disable(job_schedule_id, job_schedule_disable_options=None, custom_headers=None, raw=False, **operation_config):
"""
Disable the specified job schedule.
Args:
job_schedule_id: ID of the job schedule to disable
job_schedule_disable_options: Additional options
Returns:
None
"""
def terminate(job_schedule_id, job_schedule_terminate_options=None, custom_headers=None, raw=False, **operation_config):
"""
Terminate the specified job schedule.
Args:
job_schedule_id: ID of the job schedule to terminate
job_schedule_terminate_options: Additional options
Returns:
None
"""from azure.batch.models import (
JobScheduleAddParameter, Schedule, JobSpecification,
PoolInformation, RecurrencePattern
)
import datetime
# Create a job schedule that runs daily at 2 AM
daily_schedule = Schedule(
do_not_run_until=datetime.datetime.utcnow() + datetime.timedelta(hours=1),
do_not_run_after=datetime.datetime.utcnow() + datetime.timedelta(days=365),
start_window=datetime.timedelta(hours=1), # Job can start within 1 hour
recurrence_interval=datetime.timedelta(days=1) # Run every day
)
# Define the job specification for scheduled jobs
job_spec = JobSpecification(
pool_info=PoolInformation(pool_id="processing-pool"),
display_name="Daily Processing Job",
priority=100,
constraints=JobConstraints(
max_wall_clock_time=datetime.timedelta(hours=8)
)
)
# Create the job schedule
job_schedule = JobScheduleAddParameter(
id="daily-processing-schedule",
display_name="Daily Data Processing Schedule",
schedule=daily_schedule,
job_specification=job_spec
)
client.job_schedule.add(job_schedule)from azure.batch.models import Schedule
import datetime
# Create a job schedule that runs every Monday at 3 AM
weekly_schedule = Schedule(
do_not_run_until=datetime.datetime.utcnow(),
start_window=datetime.timedelta(minutes=30),
recurrence_interval=datetime.timedelta(weeks=1)
)
job_spec = JobSpecification(
pool_info=PoolInformation(pool_id="weekly-reports-pool"),
display_name="Weekly Report Generation"
)
weekly_job_schedule = JobScheduleAddParameter(
id="weekly-reports-schedule",
schedule=weekly_schedule,
job_specification=job_spec
)
client.job_schedule.add(weekly_job_schedule)# List all job schedules
schedules = client.job_schedule.list()
for schedule in schedules:
print(f"Schedule {schedule.id}: {schedule.state}")
print(f" Next run: {schedule.execution_info.next_run_time}")
print(f" Recent job ID: {schedule.execution_info.recent_job.id if schedule.execution_info.recent_job else 'None'}")
# Check if schedule exists
if client.job_schedule.exists("daily-processing-schedule"):
# Get schedule details
schedule = client.job_schedule.get("daily-processing-schedule")
print(f"Schedule state: {schedule.state}")
# Temporarily disable the schedule
client.job_schedule.disable("daily-processing-schedule")
# Re-enable when ready
client.job_schedule.enable("daily-processing-schedule")
# Terminate the schedule (completes current job but stops future runs)
# client.job_schedule.terminate("daily-processing-schedule")
# Delete the schedule entirely
# client.job_schedule.delete("daily-processing-schedule")from azure.batch.models import (
JobSchedulePatchParameter, Schedule, JobSpecification
)
# Update schedule timing
new_schedule = Schedule(
recurrence_interval=datetime.timedelta(hours=12), # Run twice daily instead of daily
start_window=datetime.timedelta(minutes=45)
)
# Update job specification
new_job_spec = JobSpecification(
pool_info=PoolInformation(pool_id="new-processing-pool"),
priority=200 # Higher priority
)
patch_params = JobSchedulePatchParameter(
schedule=new_schedule,
job_specification=new_job_spec
)
client.job_schedule.patch("daily-processing-schedule", patch_params)from azure.batch.models import (
JobScheduleAddParameter, Schedule, JobSpecification,
JobManagerTask, UserIdentity, AutoUserSpecification
)
# Job manager to coordinate scheduled job execution
job_manager = JobManagerTask(
id="schedule-manager",
command_line="python schedule_coordinator.py",
display_name="Schedule Coordinator",
user_identity=UserIdentity(
auto_user=AutoUserSpecification(
scope="pool",
elevation_level="nonadmin"
)
),
kill_job_on_completion=True
)
# Job specification with manager task
managed_job_spec = JobSpecification(
pool_info=PoolInformation(pool_id="managed-pool"),
job_manager_task=job_manager,
priority=150
)
# Schedule for complex managed jobs
complex_schedule = JobScheduleAddParameter(
id="managed-processing-schedule",
display_name="Managed Processing Schedule",
schedule=Schedule(
recurrence_interval=datetime.timedelta(hours=6),
start_window=datetime.timedelta(minutes=15)
),
job_specification=managed_job_spec,
metadata=[
MetadataItem(name="purpose", value="automated-processing"),
MetadataItem(name="owner", value="data-team")
]
)
client.job_schedule.add(complex_schedule)class JobScheduleAddParameter:
"""Job schedule creation specification."""
def __init__(self):
self.id: str
self.display_name: str
self.schedule: Schedule
self.job_specification: JobSpecification
self.metadata: List[MetadataItem]
class Schedule:
"""Schedule configuration for job execution."""
def __init__(self):
self.do_not_run_until: datetime.datetime
self.do_not_run_after: datetime.datetime
self.start_window: datetime.timedelta
self.recurrence_interval: datetime.timedelta
class CloudJobSchedule:
"""Job schedule information and state."""
def __init__(self):
self.id: str
self.display_name: str
self.url: str
self.e_tag: str
self.last_modified: datetime.datetime
self.creation_time: datetime.datetime
self.state: str # active, completed, disabled, terminating, deleting
self.state_transition_time: datetime.datetime
self.previous_state: str
self.previous_state_transition_time: datetime.datetime
self.schedule: Schedule
self.job_specification: JobSpecification
self.execution_info: JobScheduleExecutionInformation
self.metadata: List[MetadataItem]
self.stats: JobScheduleStatistics
class JobScheduleExecutionInformation:
"""Job schedule execution information."""
def __init__(self):
self.next_run_time: datetime.datetime
self.recent_job: RecentJob
self.end_time: datetime.datetime
class RecentJob:
"""Information about recently created job from schedule."""
def __init__(self):
self.id: str
self.url: strclass JobSchedulePatchParameter:
"""Parameters for patching job schedule properties."""
def __init__(self):
self.schedule: Schedule
self.job_specification: JobSpecification
self.metadata: List[MetadataItem]
class JobScheduleUpdateParameter:
"""Parameters for updating job schedule properties."""
def __init__(self):
self.schedule: Schedule
self.job_specification: JobSpecification
self.metadata: List[MetadataItem]class JobScheduleListOptions:
"""Options for listing job schedules."""
def __init__(self):
self.filter: str
self.select: str
self.expand: str
self.max_results: int
self.timeout: int
class JobScheduleGetOptions:
"""Options for getting job schedule information."""
def __init__(self):
self.select: str
self.expand: str
self.timeout: int
class JobScheduleAddOptions:
"""Options for adding job schedules."""
def __init__(self):
self.timeout: int
class JobScheduleDeleteOptions:
"""Options for deleting job schedules."""
def __init__(self):
self.timeout: int
class JobScheduleExistsOptions:
"""Options for checking job schedule existence."""
def __init__(self):
self.timeout: intInstall with Tessl CLI
npx tessl i tessl/pypi-azure-batchdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10