Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Core provides comprehensive pagination support for handling large result sets from Azure services. The paging system supports both synchronous and asynchronous operations with automatic continuation token management, error recovery, and flexible iteration patterns.
Iterator for paging through individual items across multiple pages with lazy evaluation and continuation token support.
from azure.core.paging import ItemPaged
from typing import Iterator, Optional, TypeVar, Callable, Tuple, Iterable, Any
ReturnType = TypeVar('ReturnType')
ResponseType = TypeVar('ResponseType')
class ItemPaged(Iterator[ReturnType]):
def __init__(
self,
get_next: Callable[[Optional[str]], ResponseType],
extract_data: Callable[[ResponseType], Tuple[str, Iterable[ReturnType]]],
continuation_token: Optional[str] = None,
*,
page_iterator_class: type = PageIterator
): ...
def by_page(self, continuation_token: Optional[str] = None) -> Iterator[Iterator[ReturnType]]: ...
def __iter__(self) -> Iterator[ReturnType]: ...
def __next__(self) -> ReturnType: ...Iterator for accessing results page by page with continuation token management and error recovery.
from azure.core.paging import PageIterator
class PageIterator(Iterator[Iterator[ReturnType]]):
def __init__(
self,
get_next: Callable[[Optional[str]], ResponseType],
extract_data: Callable[[ResponseType], Tuple[str, Iterable[ReturnType]]],
continuation_token: Optional[str] = None,
): ...
def __iter__(self) -> Iterator[Iterator[ReturnType]]: ...
def __next__(self) -> Iterator[ReturnType]: ...from azure.core.paging import ItemPaged
def get_next(continuation_token=None):
"""Fetch next page of results from service"""
if not continuation_token:
return {
"nextLink": "page2",
"value": ["item1", "item2", "item3"]
}
elif continuation_token == "page2":
return {
"nextLink": None,
"value": ["item4", "item5"]
}
return {"nextLink": None, "value": []}
def extract_data(response):
"""Extract continuation token and items from response"""
next_link = response.get("nextLink")
items = response.get("value", [])
return next_link, iter(items)
# Create pager and iterate over all items
pager = ItemPaged(get_next, extract_data)
# Iterate through all items across all pages
all_items = []
for item in pager:
all_items.append(item)
print(f"Got item: {item}")
# Result: ["item1", "item2", "item3", "item4", "item5"]# Iterate page by page for more control
pager = ItemPaged(get_next, extract_data)
for page in pager.by_page():
page_items = list(page)
print(f"Page contains {len(page_items)} items: {page_items}")
# Process each item in the current page
for item in page_items:
process_item(item)# Start from beginning
pager = ItemPaged(get_next, extract_data)
# Get some items and stop
items_so_far = []
page_iter = pager.by_page()
# Process first page
first_page = next(page_iter)
items_so_far.extend(list(first_page))
# Resume from a specific token later
token = "page2" # This would come from your service
resumed_pager = ItemPaged(get_next, extract_data).by_page(continuation_token=token)
for page in resumed_pager:
remaining_items = list(page)
items_so_far.extend(remaining_items)Asynchronous version of ItemPaged for use with async/await patterns.
from azure.core.async_paging import AsyncItemPaged
from typing import AsyncIterator, Awaitable
class AsyncItemPaged(AsyncIterator[ReturnType]):
def __init__(
self,
get_next: Callable[[Optional[str]], Awaitable[ResponseType]],
extract_data: Callable[[ResponseType], Awaitable[Tuple[str, AsyncIterator[ReturnType]]]],
continuation_token: Optional[str] = None,
*,
page_iterator_class: type = AsyncPageIterator
): ...
def by_page(self, continuation_token: Optional[str] = None) -> AsyncIterator[AsyncIterator[ReturnType]]: ...
async def __anext__(self) -> ReturnType: ...Asynchronous page iterator with async continuation token handling.
from azure.core.async_paging import AsyncPageIterator
class AsyncPageIterator(AsyncIterator[AsyncIterator[ReturnType]]):
def __init__(
self,
get_next: Callable[[Optional[str]], Awaitable[ResponseType]],
extract_data: Callable[[ResponseType], Awaitable[Tuple[str, AsyncIterator[ReturnType]]]],
continuation_token: Optional[str] = None,
): ...
async def __anext__(self) -> AsyncIterator[ReturnType]: ...import asyncio
from azure.core.async_paging import AsyncItemPaged, AsyncList
async def async_get_next(continuation_token=None):
"""Async version of get_next"""
await asyncio.sleep(0.1) # Simulate API call
if not continuation_token:
return {
"nextLink": "page2",
"value": ["async_item1", "async_item2"]
}
elif continuation_token == "page2":
return {
"nextLink": None,
"value": ["async_item3", "async_item4"]
}
return {"nextLink": None, "value": []}
async def async_extract_data(response):
"""Extract data for async iteration"""
next_link = response.get("nextLink")
items = response.get("value", [])
# Wrap sync iterable for async iteration
return next_link, AsyncList(items)
async def async_paging_example():
# Async item-by-item iteration
pager = AsyncItemPaged(async_get_next, async_extract_data)
async for item in pager:
print(f"Async item: {item}")
# Async page-by-page iteration
pager = AsyncItemPaged(async_get_next, async_extract_data)
async for page in pager.by_page():
async for item in page:
print(f"Page item: {item}")
# Run the async example
asyncio.run(async_paging_example())from azure.core.paging import ItemPaged
from azure.core import PipelineClient
class MyAzureServiceClient:
def __init__(self, endpoint: str, credential):
self._client = PipelineClient(base_url=endpoint, credential=credential)
def list_resources(self, **kwargs) -> ItemPaged[dict]:
"""List resources with automatic paging"""
def get_next(next_link=None):
if next_link:
# Use provided next link
request = HttpRequest("GET", next_link)
else:
# Build initial request
request = HttpRequest("GET", "/api/resources")
if kwargs.get('filter'):
request.url += f"?$filter={kwargs['filter']}"
# Execute request through pipeline
response = self._client.send_request(request)
response.raise_for_status()
return response.json()
def extract_data(response_data):
# Extract items and continuation token from response
items = response_data.get("value", [])
next_link = response_data.get("nextLink") or response_data.get("@odata.nextLink")
return next_link, iter(items)
return ItemPaged(get_next, extract_data)
# Usage
client = MyAzureServiceClient("https://api.service.azure.com", credential)
# Iterate over all resources across all pages
for resource in client.list_resources(filter="status eq 'active'"):
print(f"Resource: {resource['name']}")
# Or process page by page for better memory management
for page in client.list_resources().by_page():
resources = list(page)
print(f"Processing {len(resources)} resources in this page")
process_resource_batch(resources)from azure.core.exceptions import AzureError
def robust_paging_example():
pager = ItemPaged(get_next, extract_data)
page_iterator = pager.by_page()
processed_items = []
continuation_token = None
try:
for page in page_iterator:
# Process current page
page_items = list(page)
processed_items.extend(page_items)
# Save progress periodically
if len(processed_items) % 100 == 0:
save_progress(processed_items, continuation_token)
except AzureError as e:
print(f"Error occurred: {e}")
# The iterator preserves continuation token for recovery
if hasattr(page_iterator, 'continuation_token'):
print(f"Can resume from token: {page_iterator.continuation_token}")
# Resume processing later
resumed_pager = ItemPaged(get_next, extract_data).by_page(
continuation_token=page_iterator.continuation_token
)class CustomPageIterator(PageIterator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pages_processed = 0
def __next__(self):
result = super().__next__()
self.pages_processed += 1
print(f"Processed {self.pages_processed} pages so far")
return result
# Use custom page iterator
pager = ItemPaged(
get_next,
extract_data,
page_iterator_class=CustomPageIterator
)def process_large_dataset(pager: ItemPaged, batch_size: int = 100):
"""Process large datasets in batches to manage memory usage"""
batch = []
for item in pager:
batch.append(item)
if len(batch) >= batch_size:
# Process batch and clear memory
process_batch(batch)
batch.clear()
# Process remaining items
if batch:
process_batch(batch)
# Usage with a service that returns millions of items
large_pager = client.list_all_items()
process_large_dataset(large_pager, batch_size=1000)Lazy Evaluation: Items and pages are fetched on-demand, minimizing memory usage for large datasets.
Continuation Token Support: Built-in support for resuming interrupted operations from any point.
Error Recovery: Errors preserve continuation state, allowing for robust error handling and recovery.
Type Safety: Full generic typing support with proper type hints for items and responses.
Dual Interface: Both item-by-item and page-by-page iteration patterns for different use cases.
Sync/Async Parity: Matching APIs for both synchronous and asynchronous operations.
Flexible Construction: Support for custom page iterator classes and extraction logic.
The paging system provides a robust, efficient way to handle large result sets from Azure services while maintaining simplicity for common use cases and flexibility for advanced scenarios.
Install with Tessl CLI
npx tessl i tessl/pypi-azure-coredocs