AutoRest swagger generator Python client runtime for REST API clients with serialization, authentication, and request handling.
—
Polling system for long-running operations with configurable retry strategies, timeout handling, and completion detection for Azure ARM and other async API patterns. The LRO (Long-Running Operation) system handles operations that don't complete immediately.
Main class for polling long-running operations until completion.
class LROPoller:
def result(self, timeout=None):
"""
Get the result of the long-running operation.
Parameters:
- timeout: Maximum time to wait in seconds (None for no timeout)
Returns:
Final result of the operation
Raises:
TimeoutError if operation doesn't complete within timeout
"""
def wait(self, timeout=None):
"""
Wait for operation to complete without returning result.
Parameters:
- timeout: Maximum time to wait in seconds
"""
def done(self) -> bool:
"""
Check if operation is complete.
Returns:
True if operation is finished, False otherwise
"""
def add_done_callback(self, func):
"""
Add callback to execute when operation completes.
Parameters:
- func: Callback function to execute
"""
def remove_done_callback(self, func):
"""
Remove previously added callback.
Parameters:
- func: Callback function to remove
"""
def status(self) -> str:
"""
Get current operation status.
Returns:
Status string (e.g., 'InProgress', 'Succeeded', 'Failed')
"""Base class and implementations for different polling strategies.
class PollingMethod:
def initialize(self, client, initial_response, deserialization_callback):
"""
Initialize polling method with initial response.
Parameters:
- client: HTTP client for making requests
- initial_response: Initial operation response
- deserialization_callback: Function to deserialize responses
"""
def run(self) -> bool:
"""
Execute one polling iteration.
Returns:
True if operation is complete, False to continue polling
"""
def status(self) -> str:
"""Get current operation status."""
def finished(self) -> bool:
"""Check if operation is finished."""
def resource(self):
"""Get final resource/result."""
class NoPolling(PollingMethod):
"""
No-op polling method that considers operation immediately complete.
Used for operations that complete synchronously.
"""Async versions of polling classes for asynchronous operations.
class AsyncPollingMethod:
async def run(self) -> bool:
"""Execute one polling iteration asynchronously."""
def initialize(self, client, initial_response, deserialization_callback):
"""Initialize async polling method."""
class AsyncNoPolling(AsyncPollingMethod):
"""Async no-op polling method."""
def async_poller(client, initial_response, deserialization_callback, polling_method):
"""
Create async poller for long-running operations.
Parameters:
- client: Async HTTP client
- initial_response: Initial operation response
- deserialization_callback: Response deserialization function
- polling_method: Async polling method instance
Returns:
Async poller object
"""from msrest import ServiceClient, Configuration
from msrest.polling import LROPoller
# Setup client
config = Configuration(base_url='https://api.example.com')
client = ServiceClient(None, config)
# Start long-running operation
request = client.post('/start-operation', content='{"data": "value"}')
initial_response = client.send(request)
# Create poller (assuming service provides one)
# This is typically done by the SDK, not manually
poller = LROPoller(client, initial_response, deserialization_callback)
# Wait for completion and get result
try:
result = poller.result(timeout=300) # 5 minute timeout
print(f"Operation completed: {result}")
except TimeoutError:
print("Operation timed out")from msrest.polling import LROPoller
import time
# Start operation and get poller
poller = start_long_running_operation()
# Poll manually with status checks
while not poller.done():
status = poller.status()
print(f"Operation status: {status}")
if status == 'Failed':
print("Operation failed")
break
elif status == 'Cancelled':
print("Operation was cancelled")
break
# Wait before next check
time.sleep(10)
if poller.done():
result = poller.result()
print(f"Final result: {result}")from msrest.polling import LROPoller
def operation_completed(poller):
"""Callback function executed when operation completes."""
try:
result = poller.result()
print(f"Operation finished successfully: {result}")
except Exception as e:
print(f"Operation failed: {e}")
def operation_progress(poller):
"""Callback for progress updates."""
status = poller.status()
print(f"Progress update: {status}")
# Create poller and add callbacks
poller = start_long_running_operation()
poller.add_done_callback(operation_completed)
# Wait for completion (callbacks will be called)
poller.wait(timeout=600) # 10 minute timeoutfrom msrest.polling import PollingMethod
import time
import json
class CustomPollingMethod(PollingMethod):
"""Custom polling method for specific API pattern."""
def __init__(self, timeout=30, interval=5):
super(CustomPollingMethod, self).__init__()
self.timeout = timeout
self.interval = interval
self._status = 'InProgress'
self._finished = False
self._resource = None
def initialize(self, client, initial_response, deserialization_callback):
"""Initialize with API-specific logic."""
self._client = client
self._deserialize = deserialization_callback
# Parse initial response for operation URL
response_data = json.loads(initial_response.text)
self._operation_url = response_data.get('operation_url')
self._status = response_data.get('status', 'InProgress')
def run(self):
"""Check operation status."""
if self._finished:
return True
# Make status check request
request = self._client.get(self._operation_url)
response = self._client.send(request)
# Parse response
data = json.loads(response.text)
self._status = data.get('status')
if self._status in ['Succeeded', 'Completed']:
self._finished = True
self._resource = self._deserialize('OperationResult', response)
return True
elif self._status in ['Failed', 'Cancelled']:
self._finished = True
return True
# Still in progress
time.sleep(self.interval)
return False
def status(self):
return self._status
def finished(self):
return self._finished
def resource(self):
return self._resource
# Use custom polling method
custom_method = CustomPollingMethod(timeout=300, interval=10)
poller = LROPoller(client, initial_response, custom_method)
result = poller.result()import asyncio
from msrest.polling import async_poller, AsyncNoPolling
async def async_long_running_operation():
"""Example async long-running operation."""
# Start async operation
async_client = await create_async_client()
initial_response = await async_client.post('/async-operation')
# Create async poller
polling_method = AsyncNoPolling() # or custom async method
poller = async_poller(
async_client,
initial_response,
deserialization_callback,
polling_method
)
# Wait for completion asynchronously
result = await poller.result(timeout=300)
return result
# Run async operation
async def main():
try:
result = await async_long_running_operation()
print(f"Async operation result: {result}")
except asyncio.TimeoutError:
print("Async operation timed out")
asyncio.run(main())from msrest.polling import LROPoller
import json
class AzureARMPollingMethod:
"""Polling method for Azure Resource Manager operations."""
def initialize(self, client, initial_response, deserialization_callback):
self._client = client
self._deserialize = deserialization_callback
# Azure ARM uses specific headers for polling
headers = initial_response.headers
# Check for Azure-AsyncOperation header (preferred)
if 'Azure-AsyncOperation' in headers:
self._polling_url = headers['Azure-AsyncOperation']
self._method = 'async_operation'
# Check for Location header (fallback)
elif 'Location' in headers:
self._polling_url = headers['Location']
self._method = 'location'
else:
# No polling needed, operation is complete
self._method = 'no_polling'
self._finished = True
return
self._finished = False
def run(self):
if self._finished:
return True
# Poll the appropriate URL
request = self._client.get(self._polling_url)
response = self._client.send(request)
if self._method == 'async_operation':
# Parse Azure-AsyncOperation response
data = json.loads(response.text)
status = data.get('status')
if status == 'Succeeded':
# Need to get final result from original Location
self._finished = True
return True
elif status in ['Failed', 'Cancelled']:
self._finished = True
return True
elif self._method == 'location':
# Location polling - check response status
if response.status_code == 200:
self._finished = True
return True
elif response.status_code == 202:
# Still in progress
return False
return False
# Use with Azure operations
def create_azure_resource():
# Start resource creation
request = client.put('/subscriptions/.../resourceGroups/.../providers/.../resource')
initial_response = client.send(request)
# Create poller with Azure-specific method
azure_method = AzureARMPollingMethod()
poller = LROPoller(client, initial_response, azure_method)
# Wait for resource creation to complete
result = poller.result(timeout=600) # 10 minutes
return resultfrom msrest.polling import LROPoller
from msrest.exceptions import HttpOperationError
def robust_long_running_operation():
"""LRO with comprehensive error handling."""
try:
# Start operation
poller = start_operation()
# Add error callback
def error_callback(poller):
status = poller.status()
if status == 'Failed':
print("Operation failed - checking details")
# Could inspect poller.result() for error details
poller.add_done_callback(error_callback)
# Wait with timeout
result = poller.result(timeout=300)
if poller.status() == 'Succeeded':
return result
else:
raise Exception(f"Operation ended with status: {poller.status()}")
except TimeoutError:
print("Operation timed out")
# Could cancel operation or continue waiting
raise
except HttpOperationError as e:
print(f"HTTP error during polling: {e.response.status_code}")
raise
except Exception as e:
print(f"Unexpected error during LRO: {e}")
raise
# Usage with error handling
try:
result = robust_long_running_operation()
print(f"Success: {result}")
except Exception as e:
print(f"LRO failed: {e}")from msrest.polling import LROPoller
import threading
import time
def monitor_multiple_operations():
"""Monitor multiple long-running operations concurrently."""
# Start multiple operations
pollers = []
for i in range(5):
poller = start_operation(f"operation_{i}")
pollers.append((f"operation_{i}", poller))
# Monitor all operations
completed = {}
while len(completed) < len(pollers):
for name, poller in pollers:
if name not in completed and poller.done():
try:
result = poller.result()
completed[name] = result
print(f"{name} completed successfully")
except Exception as e:
completed[name] = None
print(f"{name} failed: {e}")
# Check every 10 seconds
time.sleep(10)
return completed
# Run monitoring
results = monitor_multiple_operations()
print(f"All operations completed: {len(results)} total")# Common LRO status values
LRO_STATUS_IN_PROGRESS = "InProgress"
LRO_STATUS_SUCCEEDED = "Succeeded"
LRO_STATUS_FAILED = "Failed"
LRO_STATUS_CANCELLED = "Cancelled"
# Callback function signature
def lro_callback(poller: LROPoller) -> None:
"""
Callback function called when LRO completes.
Parameters:
- poller: The LROPoller instance that completed
"""Install with Tessl CLI
npx tessl i tessl/pypi-msrest