CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-msrest

AutoRest swagger generator Python client runtime for REST API clients with serialization, authentication, and request handling.

Pending
Overview
Eval results
Files

polling.mddocs/

Long-Running Operations

Polling system for long-running operations with configurable retry strategies, timeout handling, and completion detection for Azure ARM and other async API patterns. The LRO (Long-Running Operation) system handles operations that don't complete immediately.

Capabilities

LRO Poller

Main class for polling long-running operations until completion.

class LROPoller:
    def result(self, timeout=None):
        """
        Get the result of the long-running operation.
        
        Parameters:
        - timeout: Maximum time to wait in seconds (None for no timeout)
        
        Returns:
        Final result of the operation
        
        Raises:
        TimeoutError if operation doesn't complete within timeout
        """
    
    def wait(self, timeout=None):
        """
        Wait for operation to complete without returning result.
        
        Parameters:
        - timeout: Maximum time to wait in seconds
        """
    
    def done(self) -> bool:
        """
        Check if operation is complete.
        
        Returns:
        True if operation is finished, False otherwise
        """
    
    def add_done_callback(self, func):
        """
        Add callback to execute when operation completes.
        
        Parameters:
        - func: Callback function to execute
        """
    
    def remove_done_callback(self, func):
        """
        Remove previously added callback.
        
        Parameters:
        - func: Callback function to remove
        """
    
    def status(self) -> str:
        """
        Get current operation status.
        
        Returns:
        Status string (e.g., 'InProgress', 'Succeeded', 'Failed')
        """

Polling Methods

Base class and implementations for different polling strategies.

class PollingMethod:
    def initialize(self, client, initial_response, deserialization_callback):
        """
        Initialize polling method with initial response.
        
        Parameters:
        - client: HTTP client for making requests
        - initial_response: Initial operation response
        - deserialization_callback: Function to deserialize responses
        """
    
    def run(self) -> bool:
        """
        Execute one polling iteration.
        
        Returns:
        True if operation is complete, False to continue polling
        """
    
    def status(self) -> str:
        """Get current operation status."""
    
    def finished(self) -> bool:
        """Check if operation is finished."""
    
    def resource(self):
        """Get final resource/result."""

class NoPolling(PollingMethod):
    """
    No-op polling method that considers operation immediately complete.
    Used for operations that complete synchronously.
    """

Async Polling (Python 3.5+)

Async versions of polling classes for asynchronous operations.

class AsyncPollingMethod:
    async def run(self) -> bool:
        """Execute one polling iteration asynchronously."""
    
    def initialize(self, client, initial_response, deserialization_callback):
        """Initialize async polling method."""

class AsyncNoPolling(AsyncPollingMethod):
    """Async no-op polling method."""

def async_poller(client, initial_response, deserialization_callback, polling_method):
    """
    Create async poller for long-running operations.
    
    Parameters:
    - client: Async HTTP client
    - initial_response: Initial operation response
    - deserialization_callback: Response deserialization function
    - polling_method: Async polling method instance
    
    Returns:
    Async poller object
    """

Usage Examples

Basic Long-Running Operation

from msrest import ServiceClient, Configuration
from msrest.polling import LROPoller

# Setup client
config = Configuration(base_url='https://api.example.com')
client = ServiceClient(None, config)

# Start long-running operation
request = client.post('/start-operation', content='{"data": "value"}')
initial_response = client.send(request)

# Create poller (assuming service provides one)
# This is typically done by the SDK, not manually
poller = LROPoller(client, initial_response, deserialization_callback)

# Wait for completion and get result
try:
    result = poller.result(timeout=300)  # 5 minute timeout
    print(f"Operation completed: {result}")
except TimeoutError:
    print("Operation timed out")

Polling with Status Checks

from msrest.polling import LROPoller
import time

# Start operation and get poller
poller = start_long_running_operation()

# Poll manually with status checks
while not poller.done():
    status = poller.status()
    print(f"Operation status: {status}")
    
    if status == 'Failed':
        print("Operation failed")
        break
    elif status == 'Cancelled':
        print("Operation was cancelled")
        break
    
    # Wait before next check
    time.sleep(10)

if poller.done():
    result = poller.result()
    print(f"Final result: {result}")

Using Callbacks

from msrest.polling import LROPoller

def operation_completed(poller):
    """Callback function executed when operation completes."""
    try:
        result = poller.result()
        print(f"Operation finished successfully: {result}")
    except Exception as e:
        print(f"Operation failed: {e}")

def operation_progress(poller):
    """Callback for progress updates."""
    status = poller.status()
    print(f"Progress update: {status}")

# Create poller and add callbacks
poller = start_long_running_operation()
poller.add_done_callback(operation_completed)

# Wait for completion (callbacks will be called)
poller.wait(timeout=600)  # 10 minute timeout

Custom Polling Method

from msrest.polling import PollingMethod
import time
import json

class CustomPollingMethod(PollingMethod):
    """Custom polling method for specific API pattern."""
    
    def __init__(self, timeout=30, interval=5):
        super(CustomPollingMethod, self).__init__()
        self.timeout = timeout
        self.interval = interval
        self._status = 'InProgress'
        self._finished = False
        self._resource = None
    
    def initialize(self, client, initial_response, deserialization_callback):
        """Initialize with API-specific logic."""
        self._client = client
        self._deserialize = deserialization_callback
        
        # Parse initial response for operation URL
        response_data = json.loads(initial_response.text)
        self._operation_url = response_data.get('operation_url')
        self._status = response_data.get('status', 'InProgress')
    
    def run(self):
        """Check operation status."""
        if self._finished:
            return True
        
        # Make status check request
        request = self._client.get(self._operation_url)
        response = self._client.send(request)
        
        # Parse response
        data = json.loads(response.text)
        self._status = data.get('status')
        
        if self._status in ['Succeeded', 'Completed']:
            self._finished = True
            self._resource = self._deserialize('OperationResult', response)
            return True
        elif self._status in ['Failed', 'Cancelled']:
            self._finished = True
            return True
        
        # Still in progress
        time.sleep(self.interval)
        return False
    
    def status(self):
        return self._status
    
    def finished(self):
        return self._finished
    
    def resource(self):
        return self._resource

# Use custom polling method
custom_method = CustomPollingMethod(timeout=300, interval=10)
poller = LROPoller(client, initial_response, custom_method)

result = poller.result()

Async Long-Running Operations

import asyncio
from msrest.polling import async_poller, AsyncNoPolling

async def async_long_running_operation():
    """Example async long-running operation."""
    
    # Start async operation
    async_client = await create_async_client()
    initial_response = await async_client.post('/async-operation')
    
    # Create async poller
    polling_method = AsyncNoPolling()  # or custom async method
    poller = async_poller(
        async_client, 
        initial_response, 
        deserialization_callback, 
        polling_method
    )
    
    # Wait for completion asynchronously
    result = await poller.result(timeout=300)
    return result

# Run async operation
async def main():
    try:
        result = await async_long_running_operation()
        print(f"Async operation result: {result}")
    except asyncio.TimeoutError:
        print("Async operation timed out")

asyncio.run(main())

Azure ARM Pattern Example

from msrest.polling import LROPoller
import json

class AzureARMPollingMethod:
    """Polling method for Azure Resource Manager operations."""
    
    def initialize(self, client, initial_response, deserialization_callback):
        self._client = client
        self._deserialize = deserialization_callback
        
        # Azure ARM uses specific headers for polling
        headers = initial_response.headers
        
        # Check for Azure-AsyncOperation header (preferred)
        if 'Azure-AsyncOperation' in headers:
            self._polling_url = headers['Azure-AsyncOperation']
            self._method = 'async_operation'
        # Check for Location header (fallback)
        elif 'Location' in headers:
            self._polling_url = headers['Location']
            self._method = 'location'
        else:
            # No polling needed, operation is complete
            self._method = 'no_polling'
            self._finished = True
            return
        
        self._finished = False
    
    def run(self):
        if self._finished:
            return True
        
        # Poll the appropriate URL
        request = self._client.get(self._polling_url)
        response = self._client.send(request)
        
        if self._method == 'async_operation':
            # Parse Azure-AsyncOperation response
            data = json.loads(response.text)
            status = data.get('status')
            
            if status == 'Succeeded':
                # Need to get final result from original Location
                self._finished = True
                return True
            elif status in ['Failed', 'Cancelled']:
                self._finished = True
                return True
                
        elif self._method == 'location':
            # Location polling - check response status
            if response.status_code == 200:
                self._finished = True
                return True
            elif response.status_code == 202:
                # Still in progress
                return False
        
        return False

# Use with Azure operations
def create_azure_resource():
    # Start resource creation
    request = client.put('/subscriptions/.../resourceGroups/.../providers/.../resource')
    initial_response = client.send(request)
    
    # Create poller with Azure-specific method
    azure_method = AzureARMPollingMethod()
    poller = LROPoller(client, initial_response, azure_method)
    
    # Wait for resource creation to complete
    result = poller.result(timeout=600)  # 10 minutes
    return result

Error Handling with LRO

from msrest.polling import LROPoller
from msrest.exceptions import HttpOperationError

def robust_long_running_operation():
    """LRO with comprehensive error handling."""
    
    try:
        # Start operation
        poller = start_operation()
        
        # Add error callback
        def error_callback(poller):
            status = poller.status()
            if status == 'Failed':
                print("Operation failed - checking details")
                # Could inspect poller.result() for error details
        
        poller.add_done_callback(error_callback)
        
        # Wait with timeout
        result = poller.result(timeout=300)
        
        if poller.status() == 'Succeeded':
            return result
        else:
            raise Exception(f"Operation ended with status: {poller.status()}")
            
    except TimeoutError:
        print("Operation timed out")
        # Could cancel operation or continue waiting
        raise
        
    except HttpOperationError as e:
        print(f"HTTP error during polling: {e.response.status_code}")
        raise
        
    except Exception as e:
        print(f"Unexpected error during LRO: {e}")
        raise

# Usage with error handling
try:
    result = robust_long_running_operation()
    print(f"Success: {result}")
except Exception as e:
    print(f"LRO failed: {e}")

Monitoring Multiple Operations

from msrest.polling import LROPoller
import threading
import time

def monitor_multiple_operations():
    """Monitor multiple long-running operations concurrently."""
    
    # Start multiple operations
    pollers = []
    for i in range(5):
        poller = start_operation(f"operation_{i}")
        pollers.append((f"operation_{i}", poller))
    
    # Monitor all operations
    completed = {}
    
    while len(completed) < len(pollers):
        for name, poller in pollers:
            if name not in completed and poller.done():
                try:
                    result = poller.result()
                    completed[name] = result
                    print(f"{name} completed successfully")
                except Exception as e:
                    completed[name] = None
                    print(f"{name} failed: {e}")
        
        # Check every 10 seconds
        time.sleep(10)
    
    return completed

# Run monitoring
results = monitor_multiple_operations()
print(f"All operations completed: {len(results)} total")

Types

# Common LRO status values
LRO_STATUS_IN_PROGRESS = "InProgress"
LRO_STATUS_SUCCEEDED = "Succeeded" 
LRO_STATUS_FAILED = "Failed"
LRO_STATUS_CANCELLED = "Cancelled"

# Callback function signature
def lro_callback(poller: LROPoller) -> None:
    """
    Callback function called when LRO completes.
    
    Parameters:
    - poller: The LROPoller instance that completed
    """

Install with Tessl CLI

npx tessl i tessl/pypi-msrest

docs

authentication.md

configuration.md

exceptions.md

index.md

paging.md

pipeline.md

polling.md

serialization.md

service-client.md

tile.json