CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-eventgrid

Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

models.mddocs/

Models and Data Types

Azure Event Grid Python SDK provides comprehensive models for event handling, response management, and broker metadata. These models encapsulate all the data structures needed for robust event processing in Event Grid Namespaces.

Capabilities

Event Reception Models

Models for received events and their associated broker metadata.

class ReceiveDetails:
    """
    Container for a received Cloud Event with broker metadata.
    
    Attributes:
    - broker_properties: BrokerProperties - Event broker metadata
    - event: CloudEvent - The received Cloud Event
    """
    broker_properties: BrokerProperties
    event: CloudEvent
class BrokerProperties:
    """
    Broker metadata associated with a received event.
    
    Attributes:
    - lock_token: str - Unique token for event operations (acknowledge, release, etc.)
    - delivery_count: int - Number of delivery attempts for this event
    """
    lock_token: str
    delivery_count: int

Operation Result Models

Response models for consumer operations providing success/failure details.

class AcknowledgeResult:
    """
    Result of acknowledge operation.
    
    Attributes:
    - failed_lock_tokens: List[FailedLockToken] - Tokens that failed to acknowledge
    - succeeded_lock_tokens: List[str] - Tokens that were successfully acknowledged
    """
    failed_lock_tokens: List[FailedLockToken]
    succeeded_lock_tokens: List[str]
class ReleaseResult:
    """
    Result of release operation.
    
    Attributes:
    - failed_lock_tokens: List[FailedLockToken] - Tokens that failed to release
    - succeeded_lock_tokens: List[str] - Tokens that were successfully released
    """
    failed_lock_tokens: List[FailedLockToken]
    succeeded_lock_tokens: List[str]
class RejectResult:
    """
    Result of reject operation.
    
    Attributes:
    - failed_lock_tokens: List[FailedLockToken] - Tokens that failed to reject
    - succeeded_lock_tokens: List[str] - Tokens that were successfully rejected
    """
    failed_lock_tokens: List[FailedLockToken]
    succeeded_lock_tokens: List[str]
class RenewLocksResult:
    """
    Result of renew locks operation.
    
    Attributes:
    - failed_lock_tokens: List[FailedLockToken] - Tokens that failed to renew
    - succeeded_lock_tokens: List[str] - Tokens that were successfully renewed
    """
    failed_lock_tokens: List[FailedLockToken]
    succeeded_lock_tokens: List[str]

Error Models

Models for handling operation failures and error details.

class FailedLockToken:
    """
    Information about a failed lock token operation.
    
    Attributes:
    - lock_token: str - The lock token that failed
    - error: ODataV4Format - Detailed error information
    """
    lock_token: str
    error: ODataV4Format

Internal Service Models

Models used internally by the service for request/response handling.

class ReceiveResult:
    """
    Internal container for receive operation response.
    
    Attributes:
    - details: List[ReceiveDetails] - Array of received events with metadata
    """
    details: List[ReceiveDetails]
class PublishResult:
    """
    Internal result for publish operations (typically empty).
    """
    pass
class CloudEvent:
    """
    Internal CloudEvent representation following CloudEvents v1.0 specification.
    
    Attributes:
    - id: str - Event identifier
    - source: str - Context in which event occurred
    - type: str - Type of event
    - specversion: str - CloudEvents specification version
    - data: Any - Event payload data
    - data_base64: bytes - Base64-encoded event data (alternative to data)
    - time: datetime - Event timestamp
    - dataschema: str - URI of the schema for the data
    - datacontenttype: str - Content type of the data
    - subject: str - Subject of the event in context of the event producer
    """
    id: str
    source: str
    type: str
    specversion: str
    data: Any
    data_base64: bytes
    time: datetime
    dataschema: str
    datacontenttype: str
    subject: str

Enums

Predefined constants for common operations and delays.

from enum import Enum

class ReleaseDelay(str, Enum):
    """
    Predefined delay values for event release operations.
    
    Values:
    - NO_DELAY: "0" - Release immediately
    - TEN_SECONDS: "10" - Release after 10 seconds
    - ONE_MINUTE: "60" - Release after 60 seconds
    - TEN_MINUTES: "600" - Release after 600 seconds (10 minutes)
    - ONE_HOUR: "3600" - Release after 3600 seconds (1 hour)
    """
    NO_DELAY = "0"
    TEN_SECONDS = "10"
    ONE_MINUTE = "60"
    TEN_MINUTES = "600"
    ONE_HOUR = "3600"

Usage Examples

Working with Received Events

from azure.eventgrid import EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential

consumer = EventGridConsumerClient(
    endpoint="https://namespace.region.eventgrid.azure.net",
    credential=AzureKeyCredential("key"),
    namespace_topic="orders",
    subscription="processor"
)

# Receive events
events = consumer.receive(max_events=5)

for event_detail in events:
    # Access broker properties
    broker_props = event_detail.broker_properties
    print(f"Lock Token: {broker_props.lock_token}")
    print(f"Delivery Count: {broker_props.delivery_count}")
    
    # Access Cloud Event
    cloud_event = event_detail.event
    print(f"Event ID: {cloud_event.id}")
    print(f"Event Type: {cloud_event.type}")
    print(f"Event Source: {cloud_event.source}")
    print(f"Event Data: {cloud_event.data}")
    
    # Optional Cloud Event properties
    if cloud_event.subject:
        print(f"Subject: {cloud_event.subject}")
    if cloud_event.time:
        print(f"Event Time: {cloud_event.time}")
    if cloud_event.datacontenttype:
        print(f"Content Type: {cloud_event.datacontenttype}")

consumer.close()

Handling Operation Results

from azure.eventgrid.models import ReleaseDelay

# Collect lock tokens from received events
events = consumer.receive(max_events=10)
lock_tokens = [event.broker_properties.lock_token for event in events]

# Attempt to acknowledge events
ack_result = consumer.acknowledge(lock_tokens=lock_tokens)

# Check successful acknowledgments
print(f"Successfully acknowledged: {len(ack_result.succeeded_lock_tokens)}")
for token in ack_result.succeeded_lock_tokens:
    print(f"  Acknowledged: {token}")

# Handle failed acknowledgments
if ack_result.failed_lock_tokens:
    print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")
    
    retry_tokens = []
    for failed_token in ack_result.failed_lock_tokens:
        print(f"  Failed token: {failed_token.lock_token}")
        print(f"  Error: {failed_token.error}")
        
        # Check error type to decide on retry strategy
        error_str = str(failed_token.error)
        if "expired" in error_str.lower():
            print("    Lock expired - event likely processed elsewhere")
        elif "not found" in error_str.lower():
            print("    Event not found - may have been deleted")
        else:
            print("    Unexpected error - retrying")
            retry_tokens.append(failed_token.lock_token)
    
    # Retry failed tokens with release
    if retry_tokens:
        release_result = consumer.release(
            lock_tokens=retry_tokens,
            release_delay=ReleaseDelay.ONE_MINUTE
        )
        print(f"Released for retry: {len(release_result.succeeded_lock_tokens)}")

Using Release Delays

from azure.eventgrid.models import ReleaseDelay

# Different release delay strategies
events = consumer.receive(max_events=5)

for event_detail in events:
    lock_token = event_detail.broker_properties.lock_token
    delivery_count = event_detail.broker_properties.delivery_count
    
    try:
        # Attempt processing
        process_event(event_detail.event)
        consumer.acknowledge(lock_tokens=[lock_token])
        
    except TransientError:
        # Transient error - retry quickly
        consumer.release(
            lock_tokens=[lock_token],
            release_delay=ReleaseDelay.TEN_SECONDS
        )
        
    except RateLimitError:
        # Rate limited - wait longer
        consumer.release(
            lock_tokens=[lock_token],
            release_delay=ReleaseDelay.TEN_MINUTES
        )
        
    except ProcessingError:
        if delivery_count < 3:
            # Retry with exponential backoff
            delays = [ReleaseDelay.ONE_MINUTE, ReleaseDelay.TEN_MINUTES, ReleaseDelay.ONE_HOUR]
            delay = delays[min(delivery_count - 1, len(delays) - 1)]
            
            consumer.release(
                lock_tokens=[lock_token],
                release_delay=delay
            )
        else:
            # Max retries exceeded - reject
            consumer.reject(lock_tokens=[lock_token])
            
    except Exception:
        # Unexpected error - reject immediately
        consumer.reject(lock_tokens=[lock_token])

Batch Result Processing

def process_batch_results(consumer, events):
    """Process a batch of events and handle all results."""
    
    # Group events by processing outcome
    success_tokens = []
    transient_failure_tokens = []
    permanent_failure_tokens = []
    
    for event_detail in events:
        try:
            result = process_event(event_detail.event)
            
            if result.success:
                success_tokens.append(event_detail.broker_properties.lock_token)
            elif result.transient_error:
                transient_failure_tokens.append(event_detail.broker_properties.lock_token)
            else:
                permanent_failure_tokens.append(event_detail.broker_properties.lock_token)
                
        except Exception:
            permanent_failure_tokens.append(event_detail.broker_properties.lock_token)
    
    # Execute operations in parallel
    operations = []
    
    if success_tokens:
        operations.append(('acknowledge', consumer.acknowledge(lock_tokens=success_tokens)))
    
    if transient_failure_tokens:
        operations.append(('release', consumer.release(
            lock_tokens=transient_failure_tokens,
            release_delay=ReleaseDelay.ONE_MINUTE
        )))
    
    if permanent_failure_tokens:
        operations.append(('reject', consumer.reject(lock_tokens=permanent_failure_tokens)))
    
    # Process all operation results
    for operation_name, result in operations:
        print(f"{operation_name.title()} Results:")
        print(f"  Succeeded: {len(result.succeeded_lock_tokens)}")
        
        if result.failed_lock_tokens:
            print(f"  Failed: {len(result.failed_lock_tokens)}")
            for failed_token in result.failed_lock_tokens:
                print(f"    Token: {failed_token.lock_token}")
                print(f"    Error: {failed_token.error}")

# Usage
events = consumer.receive(max_events=20)
if events:
    process_batch_results(consumer, events)

Event Metadata Analysis

from datetime import datetime, timezone
from collections import defaultdict

def analyze_event_batch(events):
    """Analyze received events for monitoring and debugging."""
    
    # Group events by various attributes
    by_type = defaultdict(int)
    by_source = defaultdict(int)
    by_delivery_count = defaultdict(int)
    
    oldest_event = None
    newest_event = None
    
    for event_detail in events:
        cloud_event = event_detail.event
        broker_props = event_detail.broker_properties
        
        # Count by type and source
        by_type[cloud_event.type] += 1
        by_source[cloud_event.source] += 1
        by_delivery_count[broker_props.delivery_count] += 1
        
        # Track event timing
        if cloud_event.time:
            if oldest_event is None or cloud_event.time < oldest_event:
                oldest_event = cloud_event.time
            if newest_event is None or cloud_event.time > newest_event:
                newest_event = cloud_event.time
    
    # Print analysis
    print(f"Batch Analysis - {len(events)} events:")
    print(f"Event Types: {dict(by_type)}")
    print(f"Event Sources: {dict(by_source)}")
    print(f"Delivery Counts: {dict(by_delivery_count)}")
    
    if oldest_event and newest_event:
        age_span = newest_event - oldest_event
        print(f"Time Span: {age_span.total_seconds():.1f} seconds")
        
        # Check for old events
        now = datetime.now(timezone.utc)
        oldest_age = (now - oldest_event).total_seconds()
        if oldest_age > 3600:  # Older than 1 hour
            print(f"WARNING: Oldest event is {oldest_age/3600:.1f} hours old")
    
    # Check for retry patterns
    high_retry_events = [e for e in events if e.broker_properties.delivery_count > 2]
    if high_retry_events:
        print(f"High retry events: {len(high_retry_events)}")
        for event_detail in high_retry_events:
            print(f"  {event_detail.event.type}: {event_detail.broker_properties.delivery_count} attempts")

# Usage
events = consumer.receive(max_events=50)
if events:
    analyze_event_batch(events)

Custom Result Processing

class EventProcessor:
    """Custom event processor with detailed result tracking."""
    
    def __init__(self, consumer):
        self.consumer = consumer
        self.stats = {
            'processed': 0,
            'acknowledged': 0,
            'released': 0,
            'rejected': 0,
            'errors': 0
        }
    
    def process_batch(self, max_events=10):
        """Process a batch of events with detailed tracking."""
        
        events = self.consumer.receive(max_events=max_events)
        if not events:
            return
        
        self.stats['processed'] += len(events)
        
        # Process events
        results = []
        for event_detail in events:
            try:
                success = self.process_single_event(event_detail.event)
                results.append((event_detail.broker_properties.lock_token, success))
            except Exception as e:
                print(f"Processing error: {e}")
                results.append((event_detail.broker_properties.lock_token, False))
                self.stats['errors'] += 1
        
        # Group by outcome
        success_tokens = [token for token, success in results if success]
        failure_tokens = [token for token, success in results if not success]
        
        # Execute operations and track results
        if success_tokens:
            ack_result = self.consumer.acknowledge(lock_tokens=success_tokens)
            self.stats['acknowledged'] += len(ack_result.succeeded_lock_tokens)
            
            # Handle partial failures
            if ack_result.failed_lock_tokens:
                print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")
                failure_tokens.extend([ft.lock_token for ft in ack_result.failed_lock_tokens])
        
        if failure_tokens:
            release_result = self.consumer.release(
                lock_tokens=failure_tokens,
                release_delay=ReleaseDelay.ONE_MINUTE
            )
            self.stats['released'] += len(release_result.succeeded_lock_tokens)
            
            # Handle release failures (reject as last resort)
            if release_result.failed_lock_tokens:
                reject_tokens = [ft.lock_token for ft in release_result.failed_lock_tokens]
                reject_result = self.consumer.reject(lock_tokens=reject_tokens)
                self.stats['rejected'] += len(reject_result.succeeded_lock_tokens)
    
    def process_single_event(self, cloud_event):
        """Process a single Cloud Event."""
        # Implement your processing logic here
        print(f"Processing {cloud_event.type} from {cloud_event.source}")
        return True  # Return success/failure
    
    def print_stats(self):
        """Print processing statistics."""
        print("Processing Statistics:")
        for key, value in self.stats.items():
            print(f"  {key.title()}: {value}")

# Usage
processor = EventProcessor(consumer)

# Process multiple batches
for _ in range(10):
    processor.process_batch(max_events=5)

processor.print_stats()

Install with Tessl CLI

npx tessl i tessl/pypi-azure-eventgrid

docs

async-operations.md

consumer.md

index.md

legacy.md

models.md

publisher.md

tile.json