CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-core

Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

polling-and-long-running-operations.mddocs/

Polling and Long-Running Operations

Azure Core provides polling mechanisms for handling Azure long-running operations (LROs) with customizable polling strategies, automatic status checking, and comprehensive async support. This enables consistent handling of operations that may take minutes or hours to complete.

Core Polling Classes

Base classes for implementing polling functionality for both synchronous and asynchronous operations.

from azure.core.polling import LROPoller, AsyncLROPoller, PollingMethod, AsyncPollingMethod
from typing import Optional, Any, Callable

class LROPoller:
    """Synchronous long-running operation poller."""
    def __init__(
        self,
        client: Any,
        initial_response: Any,
        deserialization_callback: Callable,
        polling_method: PollingMethod,
        **kwargs
    ): ...
    
    def result(self, timeout: Optional[float] = None) -> Any:
        """Return the result of the long-running operation."""
        ...
    
    def wait(self, timeout: Optional[float] = None) -> None:
        """Wait for the operation to complete."""
        ...
    
    def done(self) -> bool:
        """Check if the operation has completed."""
        ...
    
    def status(self) -> str:
        """Get the current status of the operation."""
        ...
    
    def polling_method(self) -> PollingMethod:
        """Get the polling method being used."""
        ...
    
    def continuation_token(self) -> str:
        """Get continuation token for resuming the operation."""
        ...

class AsyncLROPoller:
    """Asynchronous long-running operation poller."""
    def __init__(
        self,
        client: Any,
        initial_response: Any,
        deserialization_callback: Callable,
        polling_method: AsyncPollingMethod,
        **kwargs
    ): ...
    
    async def result(self, timeout: Optional[float] = None) -> Any:
        """Return the result of the long-running operation."""
        ...
    
    async def wait(self, timeout: Optional[float] = None) -> None:
        """Wait for the operation to complete."""
        ...
    
    def done(self) -> bool:
        """Check if the operation has completed."""
        ...
    
    def status(self) -> str:
        """Get the current status of the operation."""
        ...
    
    def polling_method(self) -> AsyncPollingMethod:
        """Get the polling method being used."""
        ...
    
    def continuation_token(self) -> str:
        """Get continuation token for resuming the operation."""
        ...

Polling Method Interfaces

Base interfaces for implementing custom polling strategies.

from abc import ABC, abstractmethod

class PollingMethod(ABC):
    """Base synchronous polling method interface."""
    @abstractmethod
    def initialize(self, client: Any, initial_response: Any, deserialization_callback: Callable) -> None:
        """Initialize the polling method."""
        ...
    
    @abstractmethod
    def run(self) -> None:
        """Execute polling until completion."""
        ...
    
    @abstractmethod
    def status(self) -> str:
        """Get current operation status."""
        ...
    
    @abstractmethod
    def finished(self) -> bool:
        """Check if operation is finished."""
        ...
    
    @abstractmethod
    def resource(self) -> Any:
        """Get the final resource."""
        ...

class AsyncPollingMethod(ABC):
    """Base asynchronous polling method interface."""
    @abstractmethod
    async def initialize(self, client: Any, initial_response: Any, deserialization_callback: Callable) -> None:
        """Initialize the polling method."""
        ...
    
    @abstractmethod
    async def run(self) -> None:
        """Execute polling until completion."""
        ...
    
    @abstractmethod
    def status(self) -> str:
        """Get current operation status."""
        ...
    
    @abstractmethod
    def finished(self) -> bool:
        """Check if operation is finished."""
        ...
    
    @abstractmethod
    def resource(self) -> Any:
        """Get the final resource."""
        ...

Capabilities

No-Operation Polling

For operations that complete immediately or don't require polling.

from azure.core.polling import NoPolling, AsyncNoPolling

class NoPolling(PollingMethod):
    """No-op polling implementation for immediate operations."""
    def __init__(self): ...

class AsyncNoPolling(AsyncPollingMethod):
    """Async no-op polling implementation for immediate operations."""
    def __init__(self): ...

Async Poller Utilities

Utility functions for working with async pollers.

from azure.core.polling import async_poller

def async_poller(func):
    """Decorator for creating async pollers from sync polling methods."""
    ...

Usage Examples

Basic Synchronous Polling

from azure.core.polling import LROPoller
from azure.core import PipelineClient
import time

def start_long_running_operation():
    """Example of starting and polling a long-running operation."""
    client = PipelineClient(base_url="https://api.example.com")
    
    # Start the operation (this would be service-specific)
    initial_response = client.send_request("POST", "/operations/create")
    
    # Create poller (in real usage, this would be created by the service client)
    # poller = LROPoller(client, initial_response, deserialize_callback, polling_method)
    
    # Wait for completion
    # result = poller.result(timeout=300)  # 5 minute timeout
    # print(f"Operation completed with result: {result}")
    
    return initial_response

def check_operation_status(poller):
    """Check the status of an ongoing operation."""
    print(f"Current status: {poller.status()}")
    print(f"Is done: {poller.done()}")
    
    if not poller.done():
        print("Operation still in progress...")
        # Could wait more or check later
        poller.wait(timeout=60)  # Wait up to 1 minute
    
    if poller.done():
        result = poller.result()
        print(f"Operation completed: {result}")

Asynchronous Polling

import asyncio
from azure.core.polling import AsyncLROPoller

async def start_async_long_running_operation():
    """Example of async long-running operation polling."""
    # client = AsyncPipelineClient(base_url="https://api.example.com")
    
    # Start the operation
    # initial_response = await client.send_request("POST", "/operations/create")
    
    # Create async poller
    # poller = AsyncLROPoller(client, initial_response, deserialize_callback, async_polling_method)
    
    # Wait for completion asynchronously
    # result = await poller.result(timeout=300)
    # print(f"Async operation completed: {result}")
    
    pass

async def monitor_operation_progress(poller):
    """Monitor operation progress with periodic status checks."""
    while not poller.done():
        print(f"Status: {poller.status()}")
        await asyncio.sleep(5)  # Check every 5 seconds
    
    result = await poller.result()
    print(f"Operation completed: {result}")

# asyncio.run(start_async_long_running_operation())

Resuming Operations with Continuation Tokens

def resume_operation_from_token(continuation_token):
    """Resume a long-running operation using a continuation token."""
    # In real usage, you would recreate the poller from the token
    # poller = service_client.resume_operation(continuation_token)
    
    # Check current status
    # print(f"Resumed operation status: {poller.status()}")
    
    # Continue waiting for completion
    # result = poller.result()
    # return result
    
    pass

def save_operation_state(poller):
    """Save operation state for later resumption."""
    if not poller.done():
        token = poller.continuation_token()
        # Save token to database or file for later use
        print(f"Operation token saved: {token}")
        return token
    return None

Error Handling with Polling

from azure.core.exceptions import AzureError, HttpResponseError

def poll_with_error_handling(poller):
    """Handle errors during polling operations."""
    try:
        # Wait for operation with timeout
        result = poller.result(timeout=600)  # 10 minute timeout
        print(f"Operation succeeded: {result}")
        return result
        
    except HttpResponseError as e:
        if e.status_code == 404:
            print("Operation not found - may have been cancelled")
        elif e.status_code >= 500:
            print(f"Server error during operation: {e.status_code}")
        else:
            print(f"HTTP error: {e.status_code} - {e.reason}")
        raise
        
    except AzureError as e:
        print(f"Azure service error: {e.message}")
        
        # Check if operation can be resumed
        if hasattr(e, 'continuation_token') and e.continuation_token:
            print("Operation can be resumed with token:", e.continuation_token)
        raise
        
    except TimeoutError:
        print("Operation timed out")
        
        # Save state for later resumption
        if not poller.done():
            token = poller.continuation_token()
            print(f"Save this token to resume later: {token}")
        raise

Custom Polling Strategy

from azure.core.polling import PollingMethod
import time

class CustomPollingMethod(PollingMethod):
    """Custom polling method with exponential backoff."""
    
    def __init__(self, initial_delay=1, max_delay=60, backoff_factor=2):
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor
        self.current_delay = initial_delay
        self._client = None
        self._operation_url = None
        self._finished = False
        self._status = "InProgress"
        self._resource = None
    
    def initialize(self, client, initial_response, deserialization_callback):
        self._client = client
        self._deserialize = deserialization_callback
        
        # Extract operation URL from initial response
        if hasattr(initial_response, 'headers'):
            self._operation_url = initial_response.headers.get('Location')
        
        # Check if already completed
        if initial_response.status_code in [200, 201]:
            self._finished = True
            self._status = "Succeeded"
            self._resource = self._deserialize(initial_response)
    
    def run(self):
        while not self._finished:
            time.sleep(self.current_delay)
            
            # Poll the operation status
            response = self._client.send_request("GET", self._operation_url)
            
            if response.status_code == 200:
                # Operation completed successfully
                self._finished = True
                self._status = "Succeeded" 
                self._resource = self._deserialize(response)
            elif response.status_code == 202:
                # Still in progress
                self._status = "InProgress"
                
                # Increase delay for next poll (exponential backoff)
                self.current_delay = min(
                    self.current_delay * self.backoff_factor,
                    self.max_delay
                )
            else:
                # Error occurred
                self._finished = True
                self._status = "Failed"
                raise HttpResponseError(f"Operation failed: {response.status_code}")
    
    def status(self):
        return self._status
    
    def finished(self):
        return self._finished
    
    def resource(self):
        return self._resource

# Usage
def use_custom_polling():
    custom_method = CustomPollingMethod(initial_delay=2, max_delay=30)
    # poller = LROPoller(client, initial_response, deserialize_func, custom_method)
    # result = poller.result()

Async Custom Polling

import asyncio
from azure.core.polling import AsyncPollingMethod

class AsyncCustomPollingMethod(AsyncPollingMethod):
    """Custom async polling method."""
    
    def __init__(self, initial_delay=1, max_delay=60):
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.current_delay = initial_delay
        self._client = None
        self._operation_url = None
        self._finished = False
        self._status = "InProgress"
        self._resource = None
    
    async def initialize(self, client, initial_response, deserialization_callback):
        self._client = client
        self._deserialize = deserialization_callback
        
        if hasattr(initial_response, 'headers'):
            self._operation_url = initial_response.headers.get('Location')
        
        if initial_response.status_code in [200, 201]:
            self._finished = True
            self._status = "Succeeded"
            self._resource = self._deserialize(initial_response)
    
    async def run(self):
        while not self._finished:
            await asyncio.sleep(self.current_delay)
            
            response = await self._client.send_request("GET", self._operation_url)
            
            if response.status_code == 200:
                self._finished = True
                self._status = "Succeeded"
                self._resource = self._deserialize(response)
            elif response.status_code == 202:
                self._status = "InProgress"
                self.current_delay = min(self.current_delay * 2, self.max_delay)
            else:
                self._finished = True
                self._status = "Failed"
                raise HttpResponseError(f"Operation failed: {response.status_code}")
    
    def status(self):
        return self._status
    
    def finished(self):
        return self._finished
    
    def resource(self):
        return self._resource

Best Practices

Polling Configuration

  • Set appropriate timeouts based on expected operation duration
  • Use exponential backoff to reduce server load
  • Implement maximum retry limits to prevent infinite polling
  • Consider operation complexity when setting initial polling intervals

Resource Management

  • Always use timeouts to prevent infinite waiting
  • Save continuation tokens for long-running operations
  • Properly handle poller cleanup in exception scenarios
  • Use async pollers with async code for better resource utilization

Error Handling

  • Handle different types of errors appropriately (network, service, timeout)
  • Implement retry logic for transient failures during polling
  • Provide meaningful error messages to users
  • Log polling progress and errors for debugging

Performance Considerations

  • Use appropriate polling intervals to balance responsiveness and server load
  • Consider using webhooks or server-sent events for real-time updates when available
  • Batch multiple operations when possible to reduce polling overhead
  • Monitor polling patterns to optimize intervals for your specific use case

Install with Tessl CLI

npx tessl i tessl/pypi-azure-core

docs

async-programming-patterns.md

authentication-and-credentials.md

configuration-and-settings.md

distributed-tracing-and-diagnostics.md

error-handling-and-exceptions.md

http-pipeline-and-policies.md

index.md

paging-and-result-iteration.md

polling-and-long-running-operations.md

rest-api-abstraction.md

transport-and-networking.md

utilities-and-helpers.md

tile.json