CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-globus-sdk

Python SDK for interacting with Globus web APIs including Transfer, Auth, and other research data management services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

timers-service.mddocs/

Timer Service

Scheduled task execution and automation with support for one-time and recurring schedules, complex timing patterns, and integration with other Globus services. The Timer service enables automated execution of operations like data transfers, compute jobs, and workflow triggers based on flexible scheduling requirements.

Capabilities

Timers Client

Core client for timer creation, management, and monitoring with support for various schedule types and integration with Transfer and other Globus services.

class TimersClient(BaseClient):
    """
    Client for Globus Timers service operations.
    
    Provides methods for creating, managing, and monitoring scheduled tasks
    with support for one-time and recurring schedules, transfer automation,
    and integration with other Globus services.
    """
    
    def __init__(
        self,
        *,
        app: GlobusApp | None = None,
        authorizer: GlobusAuthorizer | None = None,
        environment: str | None = None,
        base_url: str | None = None,
        **kwargs
    ) -> None: ...
    
    def add_app_transfer_data_access_scope(
        self, 
        collection_ids: str | UUID | Iterable[str | UUID]
    ) -> TimersClient:
        """
        Add dependent data_access scopes for transfer collections.
        
        Adds required scopes to prevent ConsentRequired errors when creating
        timers that use GCS mapped collections for automated transfers.
        
        Parameters:
        - collection_ids: Collection ID(s) requiring data access scopes
        
        Returns:
        Self for method chaining
        """

Timer Creation and Management

Create, configure, and manage scheduled tasks with various schedule types and execution patterns.

def create_timer(
    self, 
    timer: dict[str, Any] | TransferTimer
) -> GlobusHTTPResponse:
    """
    Create a new timer for scheduled execution.
    
    Creates a timer with specified schedule and task definition.
    Supports both one-time and recurring schedules with flexible
    end conditions and execution parameters.
    
    Parameters:
    - timer: Timer specification including schedule and task details
    
    Returns:
    GlobusHTTPResponse with created timer details and UUID
    """

def get_timer(self, timer_id: str | UUID) -> GlobusHTTPResponse:
    """
    Get detailed information about a specific timer.
    
    Returns complete timer configuration including schedule,
    execution history, and current status.
    
    Parameters:
    - timer_id: UUID of the timer to retrieve
    
    Returns:
    GlobusHTTPResponse with timer configuration and status
    """

def list_timers(
    self,
    *,
    limit: int | None = None,
    marker: str | None = None,
    fields: str | None = None,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    List timers with pagination and filtering support.
    
    Returns timers accessible to the user with optional
    field selection and pagination for large timer sets.
    
    Parameters:
    - limit: Maximum number of timers to return
    - marker: Pagination marker for continuing listings
    - fields: Comma-separated list of fields to include
    - query_params: Additional query parameters
    
    Returns:
    GlobusHTTPResponse with paginated timer listings
    """

def update_timer(
    self,
    timer_id: str | UUID,
    data: dict[str, Any],
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Update timer configuration.
    
    Modifies timer settings including schedule, name, and other
    configuration parameters. Some changes may affect future
    executions only.
    
    Parameters:
    - timer_id: UUID of timer to update
    - data: Updated timer configuration
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming update
    """

def delete_timer(
    self,
    timer_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Delete a timer.
    
    Permanently removes a timer and stops any future scheduled
    executions. Running executions may continue to completion.
    
    Parameters:
    - timer_id: UUID of timer to delete
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming deletion
    """

def pause_timer(
    self,
    timer_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Pause a timer temporarily.
    
    Stops scheduled executions without deleting the timer.
    Can be resumed later to continue the schedule.
    
    Parameters:
    - timer_id: UUID of timer to pause
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming pause
    """

def resume_timer(
    self,
    timer_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Resume a paused timer.
    
    Restarts scheduled executions for a previously paused timer
    according to its original schedule configuration.
    
    Parameters:
    - timer_id: UUID of timer to resume
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming resume
    """

Timer Data Classes and Schedules

Comprehensive data structures for defining timer schedules, execution patterns, and task specifications with type-safe construction.

class TransferTimer(PayloadWrapper):
    """
    Helper for creating transfer-specific timers.
    
    Provides convenient interface for creating timers that execute
    Globus Transfer operations on a schedule with proper scope
    and permission handling.
    """
    
    def __init__(
        self,
        *,
        name: str,
        schedule: OnceTimerSchedule | RecurringTimerSchedule | dict[str, Any],
        body: TransferData | dict[str, Any],
        **kwargs
    ) -> None: ...

class OnceTimerSchedule(PayloadWrapper):
    """
    Schedule for timers that run exactly once.
    
    Defines a single execution time for one-time scheduled tasks,
    supporting both immediate execution and future scheduling.
    """
    
    def __init__(
        self,
        datetime: str | dt.datetime | MissingType = MISSING,
        **kwargs
    ) -> None: ...

class RecurringTimerSchedule(PayloadWrapper):
    """
    Schedule for timers that run repeatedly.
    
    Defines recurring execution with configurable intervals,
    start times, and end conditions including date limits
    and execution count limits.
    """
    
    def __init__(
        self,
        interval_seconds: int,
        *,
        start: str | dt.datetime | MissingType = MISSING,
        end: dict[str, Any] | MissingType = MISSING,
        **kwargs
    ) -> None: ...
    
    @classmethod
    def every_hour(cls, **kwargs) -> RecurringTimerSchedule:
        """Create schedule that runs every hour."""
    
    @classmethod  
    def every_day(cls, **kwargs) -> RecurringTimerSchedule:
        """Create schedule that runs daily."""
    
    @classmethod
    def every_week(cls, **kwargs) -> RecurringTimerSchedule:
        """Create schedule that runs weekly."""
    
    @classmethod
    def every_month(cls, **kwargs) -> RecurringTimerSchedule:
        """Create schedule that runs monthly (30 days)."""

class TimerJob(PayloadWrapper):
    """
    Legacy timer job specification.
    
    Note: This class is deprecated in favor of TransferTimer for
    transfer operations. Still supported for non-transfer use cases.
    """
    
    def __init__(
        self,
        *,
        name: str,
        callback_url: str,
        start_time: str | dt.datetime | MissingType = MISSING,
        interval: int | MissingType = MISSING,
        stop_after: dict[str, Any] | MissingType = MISSING,
        **kwargs
    ) -> None: ...

Execution Monitoring and History

Monitor timer executions, view execution history, and track task results with comprehensive status information.

def get_timer_executions(
    self,
    timer_id: str | UUID,
    *,
    limit: int | None = None,
    marker: str | None = None,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Get execution history for a timer.
    
    Returns list of past and scheduled executions with status
    information, results, and timing details.
    
    Parameters:
    - timer_id: UUID of the timer
    - limit: Maximum executions to return
    - marker: Pagination marker
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse with execution history
    """

def get_execution(
    self,
    execution_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Get detailed information about a specific execution.
    
    Returns complete execution details including status,
    results, error information, and timing data.
    
    Parameters:
    - execution_id: UUID of the execution
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse with execution details
    """

def cancel_execution(
    self,
    execution_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Cancel a running or pending execution.
    
    Attempts to stop an execution that is currently running
    or scheduled to run soon.
    
    Parameters:
    - execution_id: UUID of execution to cancel
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming cancellation request
    """

Error Handling

Timer-specific error handling for scheduling and execution operations.

class TimersAPIError(GlobusAPIError):
    """
    Error class for Timers service API errors.
    
    Provides enhanced error handling for timer-specific error
    conditions including scheduling conflicts and execution failures.
    """

Common Usage Patterns

Basic Transfer Timer Creation

from globus_sdk import TimersClient, TransferTimer, RecurringTimerSchedule, TransferData

# Initialize client with transfer data access scope
timers_client = TimersClient(app=app)
timers_client.add_app_transfer_data_access_scope([
    "source-collection-uuid",
    "dest-collection-uuid"
])

# Create transfer data specification
transfer_data = TransferData(
    source_endpoint="source-collection-uuid",
    destination_endpoint="dest-collection-uuid", 
    label="Automated Daily Backup"
)
transfer_data.add_item("/data/current/", "/backup/daily/", recursive=True)

# Create recurring schedule (daily at 2 AM)
daily_schedule = RecurringTimerSchedule(
    interval_seconds=24 * 60 * 60,  # 24 hours
    start="2024-12-01T02:00:00Z"   # Start at 2 AM UTC
)

# Create transfer timer
backup_timer = TransferTimer(
    name="Daily Data Backup",
    schedule=daily_schedule,
    body=transfer_data
)

# Create the timer
timer_response = timers_client.create_timer(backup_timer)
timer_id = timer_response["timer"]["id"]

print(f"Created daily backup timer: {timer_id}")

One-Time Scheduled Transfer

from globus_sdk import OnceTimerSchedule
import datetime

# Schedule transfer for specific future time
future_time = datetime.datetime(2024, 12, 15, 14, 30, 0)  # Dec 15, 2024 at 2:30 PM
once_schedule = OnceTimerSchedule(datetime=future_time)

# Create one-time transfer timer
maintenance_transfer = TransferTimer(
    name="Maintenance Window Transfer",
    schedule=once_schedule,
    body=transfer_data
)

one_time_timer = timers_client.create_timer(maintenance_transfer)
print(f"Scheduled one-time transfer: {one_time_timer['timer']['id']}")

Complex Recurring Schedules

# Weekly backup with end date
weekly_schedule = RecurringTimerSchedule(
    interval_seconds=7 * 24 * 60 * 60,  # 7 days
    start="2024-12-01T02:00:00Z",
    end={"condition": "time", "datetime": "2025-12-01T02:00:00Z"}
)

# Monthly archive with execution limit
monthly_schedule = RecurringTimerSchedule(
    interval_seconds=30 * 24 * 60 * 60,  # 30 days
    start="2024-12-01T00:00:00Z",
    end={"condition": "iterations", "n": 12}  # Run 12 times (1 year)
)

# Using convenience methods
hourly_sync = RecurringTimerSchedule.every_hour(
    start="2024-12-01T00:00:00Z"
)

daily_report = RecurringTimerSchedule.every_day(
    start="2024-12-01T06:00:00Z",  # 6 AM daily
    end={"condition": "iterations", "n": 365}
)

Timer Management and Monitoring

# List all timers
timers_list = timers_client.list_timers(limit=50)
for timer in timers_list["timers"]:
    print(f"Timer: {timer['name']} ({timer['id']})")
    print(f"  Status: {timer['status']}")
    print(f"  Next run: {timer.get('next_run_time', 'N/A')}")

# Get detailed timer information
timer_details = timers_client.get_timer(timer_id)
print(f"Timer created: {timer_details['timer']['created_at']}")
print(f"Schedule: {timer_details['timer']['schedule']}")

# View execution history
executions = timers_client.get_timer_executions(timer_id, limit=10)
for execution in executions["executions"]:
    print(f"Execution {execution['id']}: {execution['status']}")
    if execution.get('completion_time'):
        print(f"  Completed: {execution['completion_time']}")
    if execution.get('error_message'):
        print(f"  Error: {execution['error_message']}")

Timer Control Operations

# Pause timer temporarily
timers_client.pause_timer(timer_id)
print("Timer paused")

# Resume timer later
timers_client.resume_timer(timer_id)
print("Timer resumed")

# Update timer configuration
update_data = {
    "name": "Updated Daily Backup Timer",
    "schedule": {
        "type": "recurring",
        "interval_seconds": 12 * 60 * 60,  # Change to every 12 hours
        "start": "2024-12-01T02:00:00Z"
    }
}
timers_client.update_timer(timer_id, update_data)
print("Timer schedule updated")

# Delete timer when no longer needed
timers_client.delete_timer(timer_id)
print("Timer deleted")

Advanced Transfer Timer Patterns

# Multi-path transfer timer
multi_transfer_data = TransferData(
    source_endpoint="source-endpoint",
    destination_endpoint="dest-endpoint",
    label="Multi-Path Sync",
    sync_level="checksum",
    verify_checksum=True
)

# Add multiple transfer items
multi_transfer_data.add_item("/datasets/raw/", "/archive/raw/", recursive=True)
multi_transfer_data.add_item("/datasets/processed/", "/archive/processed/", recursive=True)
multi_transfer_data.add_item("/logs/", "/archive/logs/", recursive=True)

# Nightly sync timer
nightly_sync = TransferTimer(
    name="Nightly Multi-Path Sync",
    schedule=RecurringTimerSchedule.every_day(
        start="2024-12-01T01:00:00Z"  # 1 AM daily
    ),
    body=multi_transfer_data
)

sync_timer = timers_client.create_timer(nightly_sync)

Error Handling and Monitoring

import time

def monitor_timer_execution(timers_client, timer_id, timeout=3600):
    """Monitor timer execution with error handling."""
    
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            # Get recent executions
            executions = timers_client.get_timer_executions(timer_id, limit=5)
            
            for execution in executions["executions"]:
                status = execution["status"]
                
                if status == "completed":
                    print(f"Execution {execution['id']} completed successfully")
                    return True
                elif status == "failed":
                    error_msg = execution.get("error_message", "Unknown error")
                    print(f"Execution {execution['id']} failed: {error_msg}")
                    
                    # Get detailed execution info
                    exec_details = timers_client.get_execution(execution['id'])
                    print(f"Failure details: {exec_details.get('failure_reason', 'N/A')}")
                    return False
                elif status in ["running", "pending"]:
                    print(f"Execution {execution['id']} is {status}")
                    
            time.sleep(30)  # Wait before checking again
            
        except Exception as e:
            print(f"Error monitoring timer: {e}")
            time.sleep(60)
    
    print(f"Monitoring timeout after {timeout} seconds")
    return False

# Usage
success = monitor_timer_execution(timers_client, timer_id)

Integration with Other Services

# Timer for periodic compute job execution
from globus_sdk import ComputeClientV3

# This would require custom integration as timers primarily support transfers
# Users would typically use Flows service to coordinate complex multi-service automation

# Timer triggering a flow execution (using webhook pattern)
webhook_timer_job = TimerJob(
    name="Periodic Flow Trigger",
    callback_url="https://flows.globus.org/webhook/trigger-flow",
    interval=24 * 60 * 60,  # Daily
    start_time="2024-12-01T00:00:00Z"
)

# Note: This uses the legacy TimerJob class for webhook-based integration
webhook_timer = timers_client.create_job(webhook_timer_job)

Conditional and Smart Scheduling

# Create timer with smart end conditions
smart_schedule = RecurringTimerSchedule(
    interval_seconds=60 * 60,  # Every hour
    start="2024-12-01T00:00:00Z",
    end={
        "condition": "time", 
        "datetime": "2025-01-01T00:00:00Z"  # Stop after New Year
    }
)

# Timer with execution count limit
limited_timer = RecurringTimerSchedule(
    interval_seconds=6 * 60 * 60,  # Every 6 hours
    start="2024-12-01T00:00:00Z",
    end={
        "condition": "iterations",
        "n": 100  # Run exactly 100 times then stop
    }
)

# Business hours only timer (using specific start times)
business_hours_schedule = RecurringTimerSchedule(
    interval_seconds=24 * 60 * 60,  # Daily
    start="2024-12-01T09:00:00-05:00",  # 9 AM EST
    end={"condition": "time", "datetime": "2025-12-01T09:00:00-05:00"}
)

Install with Tessl CLI

npx tessl i tessl/pypi-globus-sdk

docs

auth-service.md

compute-service.md

core-framework.md

flows-service.md

gcs-service.md

groups-service.md

index.md

search-service.md

timers-service.md

transfer-service.md

tile.json