Python SDK for interacting with Globus web APIs including Transfer, Auth, and other research data management services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Scheduled task execution and automation with support for one-time and recurring schedules, complex timing patterns, and integration with other Globus services. The Timer service enables automated execution of operations like data transfers, compute jobs, and workflow triggers based on flexible scheduling requirements.
Core client for timer creation, management, and monitoring with support for various schedule types and integration with Transfer and other Globus services.
class TimersClient(BaseClient):
"""
Client for Globus Timers service operations.
Provides methods for creating, managing, and monitoring scheduled tasks
with support for one-time and recurring schedules, transfer automation,
and integration with other Globus services.
"""
def __init__(
self,
*,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None,
environment: str | None = None,
base_url: str | None = None,
**kwargs
) -> None: ...
def add_app_transfer_data_access_scope(
self,
collection_ids: str | UUID | Iterable[str | UUID]
) -> TimersClient:
"""
Add dependent data_access scopes for transfer collections.
Adds required scopes to prevent ConsentRequired errors when creating
timers that use GCS mapped collections for automated transfers.
Parameters:
- collection_ids: Collection ID(s) requiring data access scopes
Returns:
Self for method chaining
"""Create, configure, and manage scheduled tasks with various schedule types and execution patterns.
def create_timer(
self,
timer: dict[str, Any] | TransferTimer
) -> GlobusHTTPResponse:
"""
Create a new timer for scheduled execution.
Creates a timer with specified schedule and task definition.
Supports both one-time and recurring schedules with flexible
end conditions and execution parameters.
Parameters:
- timer: Timer specification including schedule and task details
Returns:
GlobusHTTPResponse with created timer details and UUID
"""
def get_timer(self, timer_id: str | UUID) -> GlobusHTTPResponse:
"""
Get detailed information about a specific timer.
Returns complete timer configuration including schedule,
execution history, and current status.
Parameters:
- timer_id: UUID of the timer to retrieve
Returns:
GlobusHTTPResponse with timer configuration and status
"""
def list_timers(
self,
*,
limit: int | None = None,
marker: str | None = None,
fields: str | None = None,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
List timers with pagination and filtering support.
Returns timers accessible to the user with optional
field selection and pagination for large timer sets.
Parameters:
- limit: Maximum number of timers to return
- marker: Pagination marker for continuing listings
- fields: Comma-separated list of fields to include
- query_params: Additional query parameters
Returns:
GlobusHTTPResponse with paginated timer listings
"""
def update_timer(
self,
timer_id: str | UUID,
data: dict[str, Any],
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Update timer configuration.
Modifies timer settings including schedule, name, and other
configuration parameters. Some changes may affect future
executions only.
Parameters:
- timer_id: UUID of timer to update
- data: Updated timer configuration
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming update
"""
def delete_timer(
self,
timer_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Delete a timer.
Permanently removes a timer and stops any future scheduled
executions. Running executions may continue to completion.
Parameters:
- timer_id: UUID of timer to delete
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming deletion
"""
def pause_timer(
self,
timer_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Pause a timer temporarily.
Stops scheduled executions without deleting the timer.
Can be resumed later to continue the schedule.
Parameters:
- timer_id: UUID of timer to pause
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming pause
"""
def resume_timer(
self,
timer_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Resume a paused timer.
Restarts scheduled executions for a previously paused timer
according to its original schedule configuration.
Parameters:
- timer_id: UUID of timer to resume
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming resume
"""Comprehensive data structures for defining timer schedules, execution patterns, and task specifications with type-safe construction.
class TransferTimer(PayloadWrapper):
"""
Helper for creating transfer-specific timers.
Provides convenient interface for creating timers that execute
Globus Transfer operations on a schedule with proper scope
and permission handling.
"""
def __init__(
self,
*,
name: str,
schedule: OnceTimerSchedule | RecurringTimerSchedule | dict[str, Any],
body: TransferData | dict[str, Any],
**kwargs
) -> None: ...
class OnceTimerSchedule(PayloadWrapper):
"""
Schedule for timers that run exactly once.
Defines a single execution time for one-time scheduled tasks,
supporting both immediate execution and future scheduling.
"""
def __init__(
self,
datetime: str | dt.datetime | MissingType = MISSING,
**kwargs
) -> None: ...
class RecurringTimerSchedule(PayloadWrapper):
"""
Schedule for timers that run repeatedly.
Defines recurring execution with configurable intervals,
start times, and end conditions including date limits
and execution count limits.
"""
def __init__(
self,
interval_seconds: int,
*,
start: str | dt.datetime | MissingType = MISSING,
end: dict[str, Any] | MissingType = MISSING,
**kwargs
) -> None: ...
@classmethod
def every_hour(cls, **kwargs) -> RecurringTimerSchedule:
"""Create schedule that runs every hour."""
@classmethod
def every_day(cls, **kwargs) -> RecurringTimerSchedule:
"""Create schedule that runs daily."""
@classmethod
def every_week(cls, **kwargs) -> RecurringTimerSchedule:
"""Create schedule that runs weekly."""
@classmethod
def every_month(cls, **kwargs) -> RecurringTimerSchedule:
"""Create schedule that runs monthly (30 days)."""
class TimerJob(PayloadWrapper):
"""
Legacy timer job specification.
Note: This class is deprecated in favor of TransferTimer for
transfer operations. Still supported for non-transfer use cases.
"""
def __init__(
self,
*,
name: str,
callback_url: str,
start_time: str | dt.datetime | MissingType = MISSING,
interval: int | MissingType = MISSING,
stop_after: dict[str, Any] | MissingType = MISSING,
**kwargs
) -> None: ...Monitor timer executions, view execution history, and track task results with comprehensive status information.
def get_timer_executions(
self,
timer_id: str | UUID,
*,
limit: int | None = None,
marker: str | None = None,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Get execution history for a timer.
Returns list of past and scheduled executions with status
information, results, and timing details.
Parameters:
- timer_id: UUID of the timer
- limit: Maximum executions to return
- marker: Pagination marker
- query_params: Additional parameters
Returns:
GlobusHTTPResponse with execution history
"""
def get_execution(
self,
execution_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Get detailed information about a specific execution.
Returns complete execution details including status,
results, error information, and timing data.
Parameters:
- execution_id: UUID of the execution
- query_params: Additional parameters
Returns:
GlobusHTTPResponse with execution details
"""
def cancel_execution(
self,
execution_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Cancel a running or pending execution.
Attempts to stop an execution that is currently running
or scheduled to run soon.
Parameters:
- execution_id: UUID of execution to cancel
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming cancellation request
"""Timer-specific error handling for scheduling and execution operations.
class TimersAPIError(GlobusAPIError):
"""
Error class for Timers service API errors.
Provides enhanced error handling for timer-specific error
conditions including scheduling conflicts and execution failures.
"""from globus_sdk import TimersClient, TransferTimer, RecurringTimerSchedule, TransferData
# Initialize client with transfer data access scope
timers_client = TimersClient(app=app)
timers_client.add_app_transfer_data_access_scope([
"source-collection-uuid",
"dest-collection-uuid"
])
# Create transfer data specification
transfer_data = TransferData(
source_endpoint="source-collection-uuid",
destination_endpoint="dest-collection-uuid",
label="Automated Daily Backup"
)
transfer_data.add_item("/data/current/", "/backup/daily/", recursive=True)
# Create recurring schedule (daily at 2 AM)
daily_schedule = RecurringTimerSchedule(
interval_seconds=24 * 60 * 60, # 24 hours
start="2024-12-01T02:00:00Z" # Start at 2 AM UTC
)
# Create transfer timer
backup_timer = TransferTimer(
name="Daily Data Backup",
schedule=daily_schedule,
body=transfer_data
)
# Create the timer
timer_response = timers_client.create_timer(backup_timer)
timer_id = timer_response["timer"]["id"]
print(f"Created daily backup timer: {timer_id}")from globus_sdk import OnceTimerSchedule
import datetime
# Schedule transfer for specific future time
future_time = datetime.datetime(2024, 12, 15, 14, 30, 0) # Dec 15, 2024 at 2:30 PM
once_schedule = OnceTimerSchedule(datetime=future_time)
# Create one-time transfer timer
maintenance_transfer = TransferTimer(
name="Maintenance Window Transfer",
schedule=once_schedule,
body=transfer_data
)
one_time_timer = timers_client.create_timer(maintenance_transfer)
print(f"Scheduled one-time transfer: {one_time_timer['timer']['id']}")# Weekly backup with end date
weekly_schedule = RecurringTimerSchedule(
interval_seconds=7 * 24 * 60 * 60, # 7 days
start="2024-12-01T02:00:00Z",
end={"condition": "time", "datetime": "2025-12-01T02:00:00Z"}
)
# Monthly archive with execution limit
monthly_schedule = RecurringTimerSchedule(
interval_seconds=30 * 24 * 60 * 60, # 30 days
start="2024-12-01T00:00:00Z",
end={"condition": "iterations", "n": 12} # Run 12 times (1 year)
)
# Using convenience methods
hourly_sync = RecurringTimerSchedule.every_hour(
start="2024-12-01T00:00:00Z"
)
daily_report = RecurringTimerSchedule.every_day(
start="2024-12-01T06:00:00Z", # 6 AM daily
end={"condition": "iterations", "n": 365}
)# List all timers
timers_list = timers_client.list_timers(limit=50)
for timer in timers_list["timers"]:
print(f"Timer: {timer['name']} ({timer['id']})")
print(f" Status: {timer['status']}")
print(f" Next run: {timer.get('next_run_time', 'N/A')}")
# Get detailed timer information
timer_details = timers_client.get_timer(timer_id)
print(f"Timer created: {timer_details['timer']['created_at']}")
print(f"Schedule: {timer_details['timer']['schedule']}")
# View execution history
executions = timers_client.get_timer_executions(timer_id, limit=10)
for execution in executions["executions"]:
print(f"Execution {execution['id']}: {execution['status']}")
if execution.get('completion_time'):
print(f" Completed: {execution['completion_time']}")
if execution.get('error_message'):
print(f" Error: {execution['error_message']}")# Pause timer temporarily
timers_client.pause_timer(timer_id)
print("Timer paused")
# Resume timer later
timers_client.resume_timer(timer_id)
print("Timer resumed")
# Update timer configuration
update_data = {
"name": "Updated Daily Backup Timer",
"schedule": {
"type": "recurring",
"interval_seconds": 12 * 60 * 60, # Change to every 12 hours
"start": "2024-12-01T02:00:00Z"
}
}
timers_client.update_timer(timer_id, update_data)
print("Timer schedule updated")
# Delete timer when no longer needed
timers_client.delete_timer(timer_id)
print("Timer deleted")# Multi-path transfer timer
multi_transfer_data = TransferData(
source_endpoint="source-endpoint",
destination_endpoint="dest-endpoint",
label="Multi-Path Sync",
sync_level="checksum",
verify_checksum=True
)
# Add multiple transfer items
multi_transfer_data.add_item("/datasets/raw/", "/archive/raw/", recursive=True)
multi_transfer_data.add_item("/datasets/processed/", "/archive/processed/", recursive=True)
multi_transfer_data.add_item("/logs/", "/archive/logs/", recursive=True)
# Nightly sync timer
nightly_sync = TransferTimer(
name="Nightly Multi-Path Sync",
schedule=RecurringTimerSchedule.every_day(
start="2024-12-01T01:00:00Z" # 1 AM daily
),
body=multi_transfer_data
)
sync_timer = timers_client.create_timer(nightly_sync)import time
def monitor_timer_execution(timers_client, timer_id, timeout=3600):
"""Monitor timer execution with error handling."""
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Get recent executions
executions = timers_client.get_timer_executions(timer_id, limit=5)
for execution in executions["executions"]:
status = execution["status"]
if status == "completed":
print(f"Execution {execution['id']} completed successfully")
return True
elif status == "failed":
error_msg = execution.get("error_message", "Unknown error")
print(f"Execution {execution['id']} failed: {error_msg}")
# Get detailed execution info
exec_details = timers_client.get_execution(execution['id'])
print(f"Failure details: {exec_details.get('failure_reason', 'N/A')}")
return False
elif status in ["running", "pending"]:
print(f"Execution {execution['id']} is {status}")
time.sleep(30) # Wait before checking again
except Exception as e:
print(f"Error monitoring timer: {e}")
time.sleep(60)
print(f"Monitoring timeout after {timeout} seconds")
return False
# Usage
success = monitor_timer_execution(timers_client, timer_id)# Timer for periodic compute job execution
from globus_sdk import ComputeClientV3
# This would require custom integration as timers primarily support transfers
# Users would typically use Flows service to coordinate complex multi-service automation
# Timer triggering a flow execution (using webhook pattern)
webhook_timer_job = TimerJob(
name="Periodic Flow Trigger",
callback_url="https://flows.globus.org/webhook/trigger-flow",
interval=24 * 60 * 60, # Daily
start_time="2024-12-01T00:00:00Z"
)
# Note: This uses the legacy TimerJob class for webhook-based integration
webhook_timer = timers_client.create_job(webhook_timer_job)# Create timer with smart end conditions
smart_schedule = RecurringTimerSchedule(
interval_seconds=60 * 60, # Every hour
start="2024-12-01T00:00:00Z",
end={
"condition": "time",
"datetime": "2025-01-01T00:00:00Z" # Stop after New Year
}
)
# Timer with execution count limit
limited_timer = RecurringTimerSchedule(
interval_seconds=6 * 60 * 60, # Every 6 hours
start="2024-12-01T00:00:00Z",
end={
"condition": "iterations",
"n": 100 # Run exactly 100 times then stop
}
)
# Business hours only timer (using specific start times)
business_hours_schedule = RecurringTimerSchedule(
interval_seconds=24 * 60 * 60, # Daily
start="2024-12-01T09:00:00-05:00", # 9 AM EST
end={"condition": "time", "datetime": "2025-12-01T09:00:00-05:00"}
)Install with Tessl CLI
npx tessl i tessl/pypi-globus-sdk