Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Event Grid Python SDK provides comprehensive models for event handling, response management, and broker metadata. These models encapsulate all the data structures needed for robust event processing in Event Grid Namespaces.
Models for received events and their associated broker metadata.
class ReceiveDetails:
"""
Container for a received Cloud Event with broker metadata.
Attributes:
- broker_properties: BrokerProperties - Event broker metadata
- event: CloudEvent - The received Cloud Event
"""
broker_properties: BrokerProperties
event: CloudEventclass BrokerProperties:
"""
Broker metadata associated with a received event.
Attributes:
- lock_token: str - Unique token for event operations (acknowledge, release, etc.)
- delivery_count: int - Number of delivery attempts for this event
"""
lock_token: str
delivery_count: intResponse models for consumer operations providing success/failure details.
class AcknowledgeResult:
"""
Result of acknowledge operation.
Attributes:
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to acknowledge
- succeeded_lock_tokens: List[str] - Tokens that were successfully acknowledged
"""
failed_lock_tokens: List[FailedLockToken]
succeeded_lock_tokens: List[str]class ReleaseResult:
"""
Result of release operation.
Attributes:
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to release
- succeeded_lock_tokens: List[str] - Tokens that were successfully released
"""
failed_lock_tokens: List[FailedLockToken]
succeeded_lock_tokens: List[str]class RejectResult:
"""
Result of reject operation.
Attributes:
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to reject
- succeeded_lock_tokens: List[str] - Tokens that were successfully rejected
"""
failed_lock_tokens: List[FailedLockToken]
succeeded_lock_tokens: List[str]class RenewLocksResult:
"""
Result of renew locks operation.
Attributes:
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to renew
- succeeded_lock_tokens: List[str] - Tokens that were successfully renewed
"""
failed_lock_tokens: List[FailedLockToken]
succeeded_lock_tokens: List[str]Models for handling operation failures and error details.
class FailedLockToken:
"""
Information about a failed lock token operation.
Attributes:
- lock_token: str - The lock token that failed
- error: ODataV4Format - Detailed error information
"""
lock_token: str
error: ODataV4FormatModels used internally by the service for request/response handling.
class ReceiveResult:
"""
Internal container for receive operation response.
Attributes:
- details: List[ReceiveDetails] - Array of received events with metadata
"""
details: List[ReceiveDetails]class PublishResult:
"""
Internal result for publish operations (typically empty).
"""
passclass CloudEvent:
"""
Internal CloudEvent representation following CloudEvents v1.0 specification.
Attributes:
- id: str - Event identifier
- source: str - Context in which event occurred
- type: str - Type of event
- specversion: str - CloudEvents specification version
- data: Any - Event payload data
- data_base64: bytes - Base64-encoded event data (alternative to data)
- time: datetime - Event timestamp
- dataschema: str - URI of the schema for the data
- datacontenttype: str - Content type of the data
- subject: str - Subject of the event in context of the event producer
"""
id: str
source: str
type: str
specversion: str
data: Any
data_base64: bytes
time: datetime
dataschema: str
datacontenttype: str
subject: strPredefined constants for common operations and delays.
from enum import Enum
class ReleaseDelay(str, Enum):
"""
Predefined delay values for event release operations.
Values:
- NO_DELAY: "0" - Release immediately
- TEN_SECONDS: "10" - Release after 10 seconds
- ONE_MINUTE: "60" - Release after 60 seconds
- TEN_MINUTES: "600" - Release after 600 seconds (10 minutes)
- ONE_HOUR: "3600" - Release after 3600 seconds (1 hour)
"""
NO_DELAY = "0"
TEN_SECONDS = "10"
ONE_MINUTE = "60"
TEN_MINUTES = "600"
ONE_HOUR = "3600"from azure.eventgrid import EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential
consumer = EventGridConsumerClient(
endpoint="https://namespace.region.eventgrid.azure.net",
credential=AzureKeyCredential("key"),
namespace_topic="orders",
subscription="processor"
)
# Receive events
events = consumer.receive(max_events=5)
for event_detail in events:
# Access broker properties
broker_props = event_detail.broker_properties
print(f"Lock Token: {broker_props.lock_token}")
print(f"Delivery Count: {broker_props.delivery_count}")
# Access Cloud Event
cloud_event = event_detail.event
print(f"Event ID: {cloud_event.id}")
print(f"Event Type: {cloud_event.type}")
print(f"Event Source: {cloud_event.source}")
print(f"Event Data: {cloud_event.data}")
# Optional Cloud Event properties
if cloud_event.subject:
print(f"Subject: {cloud_event.subject}")
if cloud_event.time:
print(f"Event Time: {cloud_event.time}")
if cloud_event.datacontenttype:
print(f"Content Type: {cloud_event.datacontenttype}")
consumer.close()from azure.eventgrid.models import ReleaseDelay
# Collect lock tokens from received events
events = consumer.receive(max_events=10)
lock_tokens = [event.broker_properties.lock_token for event in events]
# Attempt to acknowledge events
ack_result = consumer.acknowledge(lock_tokens=lock_tokens)
# Check successful acknowledgments
print(f"Successfully acknowledged: {len(ack_result.succeeded_lock_tokens)}")
for token in ack_result.succeeded_lock_tokens:
print(f" Acknowledged: {token}")
# Handle failed acknowledgments
if ack_result.failed_lock_tokens:
print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")
retry_tokens = []
for failed_token in ack_result.failed_lock_tokens:
print(f" Failed token: {failed_token.lock_token}")
print(f" Error: {failed_token.error}")
# Check error type to decide on retry strategy
error_str = str(failed_token.error)
if "expired" in error_str.lower():
print(" Lock expired - event likely processed elsewhere")
elif "not found" in error_str.lower():
print(" Event not found - may have been deleted")
else:
print(" Unexpected error - retrying")
retry_tokens.append(failed_token.lock_token)
# Retry failed tokens with release
if retry_tokens:
release_result = consumer.release(
lock_tokens=retry_tokens,
release_delay=ReleaseDelay.ONE_MINUTE
)
print(f"Released for retry: {len(release_result.succeeded_lock_tokens)}")from azure.eventgrid.models import ReleaseDelay
# Different release delay strategies
events = consumer.receive(max_events=5)
for event_detail in events:
lock_token = event_detail.broker_properties.lock_token
delivery_count = event_detail.broker_properties.delivery_count
try:
# Attempt processing
process_event(event_detail.event)
consumer.acknowledge(lock_tokens=[lock_token])
except TransientError:
# Transient error - retry quickly
consumer.release(
lock_tokens=[lock_token],
release_delay=ReleaseDelay.TEN_SECONDS
)
except RateLimitError:
# Rate limited - wait longer
consumer.release(
lock_tokens=[lock_token],
release_delay=ReleaseDelay.TEN_MINUTES
)
except ProcessingError:
if delivery_count < 3:
# Retry with exponential backoff
delays = [ReleaseDelay.ONE_MINUTE, ReleaseDelay.TEN_MINUTES, ReleaseDelay.ONE_HOUR]
delay = delays[min(delivery_count - 1, len(delays) - 1)]
consumer.release(
lock_tokens=[lock_token],
release_delay=delay
)
else:
# Max retries exceeded - reject
consumer.reject(lock_tokens=[lock_token])
except Exception:
# Unexpected error - reject immediately
consumer.reject(lock_tokens=[lock_token])def process_batch_results(consumer, events):
"""Process a batch of events and handle all results."""
# Group events by processing outcome
success_tokens = []
transient_failure_tokens = []
permanent_failure_tokens = []
for event_detail in events:
try:
result = process_event(event_detail.event)
if result.success:
success_tokens.append(event_detail.broker_properties.lock_token)
elif result.transient_error:
transient_failure_tokens.append(event_detail.broker_properties.lock_token)
else:
permanent_failure_tokens.append(event_detail.broker_properties.lock_token)
except Exception:
permanent_failure_tokens.append(event_detail.broker_properties.lock_token)
# Execute operations in parallel
operations = []
if success_tokens:
operations.append(('acknowledge', consumer.acknowledge(lock_tokens=success_tokens)))
if transient_failure_tokens:
operations.append(('release', consumer.release(
lock_tokens=transient_failure_tokens,
release_delay=ReleaseDelay.ONE_MINUTE
)))
if permanent_failure_tokens:
operations.append(('reject', consumer.reject(lock_tokens=permanent_failure_tokens)))
# Process all operation results
for operation_name, result in operations:
print(f"{operation_name.title()} Results:")
print(f" Succeeded: {len(result.succeeded_lock_tokens)}")
if result.failed_lock_tokens:
print(f" Failed: {len(result.failed_lock_tokens)}")
for failed_token in result.failed_lock_tokens:
print(f" Token: {failed_token.lock_token}")
print(f" Error: {failed_token.error}")
# Usage
events = consumer.receive(max_events=20)
if events:
process_batch_results(consumer, events)from datetime import datetime, timezone
from collections import defaultdict
def analyze_event_batch(events):
"""Analyze received events for monitoring and debugging."""
# Group events by various attributes
by_type = defaultdict(int)
by_source = defaultdict(int)
by_delivery_count = defaultdict(int)
oldest_event = None
newest_event = None
for event_detail in events:
cloud_event = event_detail.event
broker_props = event_detail.broker_properties
# Count by type and source
by_type[cloud_event.type] += 1
by_source[cloud_event.source] += 1
by_delivery_count[broker_props.delivery_count] += 1
# Track event timing
if cloud_event.time:
if oldest_event is None or cloud_event.time < oldest_event:
oldest_event = cloud_event.time
if newest_event is None or cloud_event.time > newest_event:
newest_event = cloud_event.time
# Print analysis
print(f"Batch Analysis - {len(events)} events:")
print(f"Event Types: {dict(by_type)}")
print(f"Event Sources: {dict(by_source)}")
print(f"Delivery Counts: {dict(by_delivery_count)}")
if oldest_event and newest_event:
age_span = newest_event - oldest_event
print(f"Time Span: {age_span.total_seconds():.1f} seconds")
# Check for old events
now = datetime.now(timezone.utc)
oldest_age = (now - oldest_event).total_seconds()
if oldest_age > 3600: # Older than 1 hour
print(f"WARNING: Oldest event is {oldest_age/3600:.1f} hours old")
# Check for retry patterns
high_retry_events = [e for e in events if e.broker_properties.delivery_count > 2]
if high_retry_events:
print(f"High retry events: {len(high_retry_events)}")
for event_detail in high_retry_events:
print(f" {event_detail.event.type}: {event_detail.broker_properties.delivery_count} attempts")
# Usage
events = consumer.receive(max_events=50)
if events:
analyze_event_batch(events)class EventProcessor:
"""Custom event processor with detailed result tracking."""
def __init__(self, consumer):
self.consumer = consumer
self.stats = {
'processed': 0,
'acknowledged': 0,
'released': 0,
'rejected': 0,
'errors': 0
}
def process_batch(self, max_events=10):
"""Process a batch of events with detailed tracking."""
events = self.consumer.receive(max_events=max_events)
if not events:
return
self.stats['processed'] += len(events)
# Process events
results = []
for event_detail in events:
try:
success = self.process_single_event(event_detail.event)
results.append((event_detail.broker_properties.lock_token, success))
except Exception as e:
print(f"Processing error: {e}")
results.append((event_detail.broker_properties.lock_token, False))
self.stats['errors'] += 1
# Group by outcome
success_tokens = [token for token, success in results if success]
failure_tokens = [token for token, success in results if not success]
# Execute operations and track results
if success_tokens:
ack_result = self.consumer.acknowledge(lock_tokens=success_tokens)
self.stats['acknowledged'] += len(ack_result.succeeded_lock_tokens)
# Handle partial failures
if ack_result.failed_lock_tokens:
print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")
failure_tokens.extend([ft.lock_token for ft in ack_result.failed_lock_tokens])
if failure_tokens:
release_result = self.consumer.release(
lock_tokens=failure_tokens,
release_delay=ReleaseDelay.ONE_MINUTE
)
self.stats['released'] += len(release_result.succeeded_lock_tokens)
# Handle release failures (reject as last resort)
if release_result.failed_lock_tokens:
reject_tokens = [ft.lock_token for ft in release_result.failed_lock_tokens]
reject_result = self.consumer.reject(lock_tokens=reject_tokens)
self.stats['rejected'] += len(reject_result.succeeded_lock_tokens)
def process_single_event(self, cloud_event):
"""Process a single Cloud Event."""
# Implement your processing logic here
print(f"Processing {cloud_event.type} from {cloud_event.source}")
return True # Return success/failure
def print_stats(self):
"""Print processing statistics."""
print("Processing Statistics:")
for key, value in self.stats.items():
print(f" {key.title()}: {value}")
# Usage
processor = EventProcessor(consumer)
# Process multiple batches
for _ in range(10):
processor.process_batch(max_events=5)
processor.print_stats()Install with Tessl CLI
npx tessl i tessl/pypi-azure-eventgrid