CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-batch

Microsoft Azure Batch Client Library for Python providing comprehensive APIs for managing batch computing workloads in Azure cloud

91

1.07x
Overview
Eval results
Files

job-schedule-operations.mddocs/

Job Schedule Operations

Job schedule management capabilities for creating and managing recurring batch jobs with time-based or interval-based scheduling. Job schedules automatically create jobs based on predefined schedules and job specifications.

Capabilities

Job Schedule Lifecycle Management

Create, retrieve, update, and delete job schedules with comprehensive configuration options.

def add(cloud_job_schedule, job_schedule_add_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Add a job schedule to the specified account.
    
    Args:
        cloud_job_schedule: The job schedule to add (JobScheduleAddParameter)
        job_schedule_add_options: Additional options for the operation
        custom_headers: Custom headers to include in request
        raw: Return raw response if True
        
    Returns:
        None
    """

def list(job_schedule_list_options=None, custom_headers=None, raw=False, **operation_config):
    """
    List all job schedules in the account.
    
    Args:
        job_schedule_list_options: Additional options for listing
        
    Returns:
        ItemPaged[CloudJobSchedule]: Paginated list of job schedules
    """

def get(job_schedule_id, job_schedule_get_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Get information about the specified job schedule.
    
    Args:
        job_schedule_id: ID of the job schedule to retrieve
        job_schedule_get_options: Additional options for the operation
        
    Returns:
        CloudJobSchedule: Job schedule information
    """

def delete(job_schedule_id, job_schedule_delete_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Delete the specified job schedule.
    
    Args:
        job_schedule_id: ID of the job schedule to delete
        job_schedule_delete_options: Additional options for deletion
        
    Returns:
        None
    """

def exists(job_schedule_id, job_schedule_exists_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Check if a job schedule exists.
    
    Args:
        job_schedule_id: ID of the job schedule to check
        
    Returns:
        bool: True if job schedule exists, False otherwise
    """

Job Schedule Configuration Updates

Update job schedule properties and configuration after creation.

def patch(job_schedule_id, job_schedule_patch_parameter, job_schedule_patch_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Update properties of the specified job schedule.
    
    Args:
        job_schedule_id: ID of the job schedule to update
        job_schedule_patch_parameter: Properties to update
        job_schedule_patch_options: Additional options
        
    Returns:
        None
    """

def update(job_schedule_id, job_schedule_update_parameter, job_schedule_update_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Update the properties of the specified job schedule.
    
    Args:
        job_schedule_id: ID of the job schedule to update
        job_schedule_update_parameter: Properties to update
        
    Returns:
        None
    """

Job Schedule State Control

Control job schedule execution state including enabling, disabling, and terminating schedules.

def enable(job_schedule_id, job_schedule_enable_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Enable the specified job schedule.
    
    Args:
        job_schedule_id: ID of the job schedule to enable
        job_schedule_enable_options: Additional options
        
    Returns:
        None
    """

def disable(job_schedule_id, job_schedule_disable_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Disable the specified job schedule.
    
    Args:
        job_schedule_id: ID of the job schedule to disable
        job_schedule_disable_options: Additional options
        
    Returns:
        None
    """

def terminate(job_schedule_id, job_schedule_terminate_options=None, custom_headers=None, raw=False, **operation_config):
    """
    Terminate the specified job schedule.
    
    Args:
        job_schedule_id: ID of the job schedule to terminate
        job_schedule_terminate_options: Additional options
        
    Returns:
        None
    """

Usage Examples

Creating a Recurring Job Schedule

from azure.batch.models import (
    JobScheduleAddParameter, Schedule, JobSpecification,
    PoolInformation, RecurrencePattern
)
import datetime

# Create a job schedule that runs daily at 2 AM
daily_schedule = Schedule(
    do_not_run_until=datetime.datetime.utcnow() + datetime.timedelta(hours=1),
    do_not_run_after=datetime.datetime.utcnow() + datetime.timedelta(days=365),
    start_window=datetime.timedelta(hours=1),  # Job can start within 1 hour
    recurrence_interval=datetime.timedelta(days=1)  # Run every day
)

# Define the job specification for scheduled jobs
job_spec = JobSpecification(
    pool_info=PoolInformation(pool_id="processing-pool"),
    display_name="Daily Processing Job",
    priority=100,
    constraints=JobConstraints(
        max_wall_clock_time=datetime.timedelta(hours=8)
    )
)

# Create the job schedule
job_schedule = JobScheduleAddParameter(
    id="daily-processing-schedule", 
    display_name="Daily Data Processing Schedule",
    schedule=daily_schedule,
    job_specification=job_spec
)

client.job_schedule.add(job_schedule)

Creating a Weekly Job Schedule

from azure.batch.models import Schedule
import datetime

# Create a job schedule that runs every Monday at 3 AM
weekly_schedule = Schedule(
    do_not_run_until=datetime.datetime.utcnow(),
    start_window=datetime.timedelta(minutes=30),
    recurrence_interval=datetime.timedelta(weeks=1)
)

job_spec = JobSpecification(
    pool_info=PoolInformation(pool_id="weekly-reports-pool"),
    display_name="Weekly Report Generation"
)

weekly_job_schedule = JobScheduleAddParameter(
    id="weekly-reports-schedule",
    schedule=weekly_schedule,
    job_specification=job_spec
)

client.job_schedule.add(weekly_job_schedule)

Managing Job Schedule State

# List all job schedules
schedules = client.job_schedule.list()
for schedule in schedules:
    print(f"Schedule {schedule.id}: {schedule.state}")
    print(f"  Next run: {schedule.execution_info.next_run_time}")
    print(f"  Recent job ID: {schedule.execution_info.recent_job.id if schedule.execution_info.recent_job else 'None'}")

# Check if schedule exists
if client.job_schedule.exists("daily-processing-schedule"):
    # Get schedule details
    schedule = client.job_schedule.get("daily-processing-schedule")
    print(f"Schedule state: {schedule.state}")
    
    # Temporarily disable the schedule
    client.job_schedule.disable("daily-processing-schedule")
    
    # Re-enable when ready
    client.job_schedule.enable("daily-processing-schedule")
    
    # Terminate the schedule (completes current job but stops future runs)
    # client.job_schedule.terminate("daily-processing-schedule")
    
    # Delete the schedule entirely
    # client.job_schedule.delete("daily-processing-schedule")

Updating Job Schedule Configuration

from azure.batch.models import (
    JobSchedulePatchParameter, Schedule, JobSpecification
)

# Update schedule timing
new_schedule = Schedule(
    recurrence_interval=datetime.timedelta(hours=12),  # Run twice daily instead of daily
    start_window=datetime.timedelta(minutes=45)
)

# Update job specification
new_job_spec = JobSpecification(
    pool_info=PoolInformation(pool_id="new-processing-pool"),
    priority=200  # Higher priority
)

patch_params = JobSchedulePatchParameter(
    schedule=new_schedule,
    job_specification=new_job_spec
)

client.job_schedule.patch("daily-processing-schedule", patch_params)

Advanced Schedule Configuration with Job Manager

from azure.batch.models import (
    JobScheduleAddParameter, Schedule, JobSpecification,
    JobManagerTask, UserIdentity, AutoUserSpecification
)

# Job manager to coordinate scheduled job execution
job_manager = JobManagerTask(
    id="schedule-manager",
    command_line="python schedule_coordinator.py",
    display_name="Schedule Coordinator",
    user_identity=UserIdentity(
        auto_user=AutoUserSpecification(
            scope="pool",
            elevation_level="nonadmin"
        )
    ),
    kill_job_on_completion=True
)

# Job specification with manager task
managed_job_spec = JobSpecification(
    pool_info=PoolInformation(pool_id="managed-pool"),
    job_manager_task=job_manager,
    priority=150
)

# Schedule for complex managed jobs
complex_schedule = JobScheduleAddParameter(
    id="managed-processing-schedule",
    display_name="Managed Processing Schedule",
    schedule=Schedule(
        recurrence_interval=datetime.timedelta(hours=6),
        start_window=datetime.timedelta(minutes=15)
    ),
    job_specification=managed_job_spec,
    metadata=[
        MetadataItem(name="purpose", value="automated-processing"),
        MetadataItem(name="owner", value="data-team")
    ]
)

client.job_schedule.add(complex_schedule)

Types

Job Schedule Configuration Types

class JobScheduleAddParameter:
    """Job schedule creation specification."""
    def __init__(self):
        self.id: str
        self.display_name: str
        self.schedule: Schedule
        self.job_specification: JobSpecification
        self.metadata: List[MetadataItem]

class Schedule:
    """Schedule configuration for job execution."""
    def __init__(self):
        self.do_not_run_until: datetime.datetime
        self.do_not_run_after: datetime.datetime
        self.start_window: datetime.timedelta
        self.recurrence_interval: datetime.timedelta

class CloudJobSchedule:
    """Job schedule information and state."""
    def __init__(self):
        self.id: str
        self.display_name: str
        self.url: str
        self.e_tag: str
        self.last_modified: datetime.datetime
        self.creation_time: datetime.datetime
        self.state: str  # active, completed, disabled, terminating, deleting
        self.state_transition_time: datetime.datetime
        self.previous_state: str
        self.previous_state_transition_time: datetime.datetime
        self.schedule: Schedule
        self.job_specification: JobSpecification
        self.execution_info: JobScheduleExecutionInformation
        self.metadata: List[MetadataItem]
        self.stats: JobScheduleStatistics

class JobScheduleExecutionInformation:
    """Job schedule execution information."""
    def __init__(self):
        self.next_run_time: datetime.datetime
        self.recent_job: RecentJob
        self.end_time: datetime.datetime

class RecentJob:
    """Information about recently created job from schedule."""
    def __init__(self):
        self.id: str
        self.url: str

Job Schedule Update Types

class JobSchedulePatchParameter:
    """Parameters for patching job schedule properties."""
    def __init__(self):
        self.schedule: Schedule
        self.job_specification: JobSpecification
        self.metadata: List[MetadataItem]

class JobScheduleUpdateParameter:
    """Parameters for updating job schedule properties."""
    def __init__(self):
        self.schedule: Schedule
        self.job_specification: JobSpecification
        self.metadata: List[MetadataItem]

Job Schedule Option Types

class JobScheduleListOptions:
    """Options for listing job schedules."""
    def __init__(self):
        self.filter: str
        self.select: str
        self.expand: str
        self.max_results: int
        self.timeout: int

class JobScheduleGetOptions:
    """Options for getting job schedule information."""
    def __init__(self):
        self.select: str
        self.expand: str
        self.timeout: int

class JobScheduleAddOptions:
    """Options for adding job schedules."""
    def __init__(self):
        self.timeout: int

class JobScheduleDeleteOptions:
    """Options for deleting job schedules."""
    def __init__(self):
        self.timeout: int

class JobScheduleExistsOptions:
    """Options for checking job schedule existence."""
    def __init__(self):
        self.timeout: int

Notes

  • Job schedules automatically create jobs based on the specified schedule and job specification
  • Each scheduled job gets a unique ID based on the schedule ID and creation time
  • Schedules can be temporarily disabled without deleting them
  • The start window defines how long after the scheduled time a job can still be created
  • Jobs created by schedules inherit the job specification properties
  • Terminated schedules complete their current job but don't create new ones
  • Deleted schedules cannot be recovered; create a new schedule if needed

Install with Tessl CLI

npx tessl i tessl/pypi-azure-batch

docs

account-operations.md

application-operations.md

certificate-operations.md

client-management.md

compute-node-extension-operations.md

compute-node-operations.md

file-operations.md

index.md

job-operations.md

job-schedule-operations.md

pool-operations.md

task-operations.md

tile.json