Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Core provides polling mechanisms for handling Azure long-running operations (LROs) with customizable polling strategies, automatic status checking, and comprehensive async support. This enables consistent handling of operations that may take minutes or hours to complete.
Base classes for implementing polling functionality for both synchronous and asynchronous operations.
from azure.core.polling import LROPoller, AsyncLROPoller, PollingMethod, AsyncPollingMethod
from typing import Optional, Any, Callable
class LROPoller:
"""Synchronous long-running operation poller."""
def __init__(
self,
client: Any,
initial_response: Any,
deserialization_callback: Callable,
polling_method: PollingMethod,
**kwargs
): ...
def result(self, timeout: Optional[float] = None) -> Any:
"""Return the result of the long-running operation."""
...
def wait(self, timeout: Optional[float] = None) -> None:
"""Wait for the operation to complete."""
...
def done(self) -> bool:
"""Check if the operation has completed."""
...
def status(self) -> str:
"""Get the current status of the operation."""
...
def polling_method(self) -> PollingMethod:
"""Get the polling method being used."""
...
def continuation_token(self) -> str:
"""Get continuation token for resuming the operation."""
...
class AsyncLROPoller:
"""Asynchronous long-running operation poller."""
def __init__(
self,
client: Any,
initial_response: Any,
deserialization_callback: Callable,
polling_method: AsyncPollingMethod,
**kwargs
): ...
async def result(self, timeout: Optional[float] = None) -> Any:
"""Return the result of the long-running operation."""
...
async def wait(self, timeout: Optional[float] = None) -> None:
"""Wait for the operation to complete."""
...
def done(self) -> bool:
"""Check if the operation has completed."""
...
def status(self) -> str:
"""Get the current status of the operation."""
...
def polling_method(self) -> AsyncPollingMethod:
"""Get the polling method being used."""
...
def continuation_token(self) -> str:
"""Get continuation token for resuming the operation."""
...Base interfaces for implementing custom polling strategies.
from abc import ABC, abstractmethod
class PollingMethod(ABC):
"""Base synchronous polling method interface."""
@abstractmethod
def initialize(self, client: Any, initial_response: Any, deserialization_callback: Callable) -> None:
"""Initialize the polling method."""
...
@abstractmethod
def run(self) -> None:
"""Execute polling until completion."""
...
@abstractmethod
def status(self) -> str:
"""Get current operation status."""
...
@abstractmethod
def finished(self) -> bool:
"""Check if operation is finished."""
...
@abstractmethod
def resource(self) -> Any:
"""Get the final resource."""
...
class AsyncPollingMethod(ABC):
"""Base asynchronous polling method interface."""
@abstractmethod
async def initialize(self, client: Any, initial_response: Any, deserialization_callback: Callable) -> None:
"""Initialize the polling method."""
...
@abstractmethod
async def run(self) -> None:
"""Execute polling until completion."""
...
@abstractmethod
def status(self) -> str:
"""Get current operation status."""
...
@abstractmethod
def finished(self) -> bool:
"""Check if operation is finished."""
...
@abstractmethod
def resource(self) -> Any:
"""Get the final resource."""
...For operations that complete immediately or don't require polling.
from azure.core.polling import NoPolling, AsyncNoPolling
class NoPolling(PollingMethod):
"""No-op polling implementation for immediate operations."""
def __init__(self): ...
class AsyncNoPolling(AsyncPollingMethod):
"""Async no-op polling implementation for immediate operations."""
def __init__(self): ...Utility functions for working with async pollers.
from azure.core.polling import async_poller
def async_poller(func):
"""Decorator for creating async pollers from sync polling methods."""
...from azure.core.polling import LROPoller
from azure.core import PipelineClient
import time
def start_long_running_operation():
"""Example of starting and polling a long-running operation."""
client = PipelineClient(base_url="https://api.example.com")
# Start the operation (this would be service-specific)
initial_response = client.send_request("POST", "/operations/create")
# Create poller (in real usage, this would be created by the service client)
# poller = LROPoller(client, initial_response, deserialize_callback, polling_method)
# Wait for completion
# result = poller.result(timeout=300) # 5 minute timeout
# print(f"Operation completed with result: {result}")
return initial_response
def check_operation_status(poller):
"""Check the status of an ongoing operation."""
print(f"Current status: {poller.status()}")
print(f"Is done: {poller.done()}")
if not poller.done():
print("Operation still in progress...")
# Could wait more or check later
poller.wait(timeout=60) # Wait up to 1 minute
if poller.done():
result = poller.result()
print(f"Operation completed: {result}")import asyncio
from azure.core.polling import AsyncLROPoller
async def start_async_long_running_operation():
"""Example of async long-running operation polling."""
# client = AsyncPipelineClient(base_url="https://api.example.com")
# Start the operation
# initial_response = await client.send_request("POST", "/operations/create")
# Create async poller
# poller = AsyncLROPoller(client, initial_response, deserialize_callback, async_polling_method)
# Wait for completion asynchronously
# result = await poller.result(timeout=300)
# print(f"Async operation completed: {result}")
pass
async def monitor_operation_progress(poller):
"""Monitor operation progress with periodic status checks."""
while not poller.done():
print(f"Status: {poller.status()}")
await asyncio.sleep(5) # Check every 5 seconds
result = await poller.result()
print(f"Operation completed: {result}")
# asyncio.run(start_async_long_running_operation())def resume_operation_from_token(continuation_token):
"""Resume a long-running operation using a continuation token."""
# In real usage, you would recreate the poller from the token
# poller = service_client.resume_operation(continuation_token)
# Check current status
# print(f"Resumed operation status: {poller.status()}")
# Continue waiting for completion
# result = poller.result()
# return result
pass
def save_operation_state(poller):
"""Save operation state for later resumption."""
if not poller.done():
token = poller.continuation_token()
# Save token to database or file for later use
print(f"Operation token saved: {token}")
return token
return Nonefrom azure.core.exceptions import AzureError, HttpResponseError
def poll_with_error_handling(poller):
"""Handle errors during polling operations."""
try:
# Wait for operation with timeout
result = poller.result(timeout=600) # 10 minute timeout
print(f"Operation succeeded: {result}")
return result
except HttpResponseError as e:
if e.status_code == 404:
print("Operation not found - may have been cancelled")
elif e.status_code >= 500:
print(f"Server error during operation: {e.status_code}")
else:
print(f"HTTP error: {e.status_code} - {e.reason}")
raise
except AzureError as e:
print(f"Azure service error: {e.message}")
# Check if operation can be resumed
if hasattr(e, 'continuation_token') and e.continuation_token:
print("Operation can be resumed with token:", e.continuation_token)
raise
except TimeoutError:
print("Operation timed out")
# Save state for later resumption
if not poller.done():
token = poller.continuation_token()
print(f"Save this token to resume later: {token}")
raisefrom azure.core.polling import PollingMethod
import time
class CustomPollingMethod(PollingMethod):
"""Custom polling method with exponential backoff."""
def __init__(self, initial_delay=1, max_delay=60, backoff_factor=2):
self.initial_delay = initial_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
self.current_delay = initial_delay
self._client = None
self._operation_url = None
self._finished = False
self._status = "InProgress"
self._resource = None
def initialize(self, client, initial_response, deserialization_callback):
self._client = client
self._deserialize = deserialization_callback
# Extract operation URL from initial response
if hasattr(initial_response, 'headers'):
self._operation_url = initial_response.headers.get('Location')
# Check if already completed
if initial_response.status_code in [200, 201]:
self._finished = True
self._status = "Succeeded"
self._resource = self._deserialize(initial_response)
def run(self):
while not self._finished:
time.sleep(self.current_delay)
# Poll the operation status
response = self._client.send_request("GET", self._operation_url)
if response.status_code == 200:
# Operation completed successfully
self._finished = True
self._status = "Succeeded"
self._resource = self._deserialize(response)
elif response.status_code == 202:
# Still in progress
self._status = "InProgress"
# Increase delay for next poll (exponential backoff)
self.current_delay = min(
self.current_delay * self.backoff_factor,
self.max_delay
)
else:
# Error occurred
self._finished = True
self._status = "Failed"
raise HttpResponseError(f"Operation failed: {response.status_code}")
def status(self):
return self._status
def finished(self):
return self._finished
def resource(self):
return self._resource
# Usage
def use_custom_polling():
custom_method = CustomPollingMethod(initial_delay=2, max_delay=30)
# poller = LROPoller(client, initial_response, deserialize_func, custom_method)
# result = poller.result()import asyncio
from azure.core.polling import AsyncPollingMethod
class AsyncCustomPollingMethod(AsyncPollingMethod):
"""Custom async polling method."""
def __init__(self, initial_delay=1, max_delay=60):
self.initial_delay = initial_delay
self.max_delay = max_delay
self.current_delay = initial_delay
self._client = None
self._operation_url = None
self._finished = False
self._status = "InProgress"
self._resource = None
async def initialize(self, client, initial_response, deserialization_callback):
self._client = client
self._deserialize = deserialization_callback
if hasattr(initial_response, 'headers'):
self._operation_url = initial_response.headers.get('Location')
if initial_response.status_code in [200, 201]:
self._finished = True
self._status = "Succeeded"
self._resource = self._deserialize(initial_response)
async def run(self):
while not self._finished:
await asyncio.sleep(self.current_delay)
response = await self._client.send_request("GET", self._operation_url)
if response.status_code == 200:
self._finished = True
self._status = "Succeeded"
self._resource = self._deserialize(response)
elif response.status_code == 202:
self._status = "InProgress"
self.current_delay = min(self.current_delay * 2, self.max_delay)
else:
self._finished = True
self._status = "Failed"
raise HttpResponseError(f"Operation failed: {response.status_code}")
def status(self):
return self._status
def finished(self):
return self._finished
def resource(self):
return self._resourceInstall with Tessl CLI
npx tessl i tessl/pypi-azure-coredocs