CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-eventgrid

Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

consumer.mddocs/

Event Consumption

Event consumption in Azure Event Grid Namespaces enables receiving and managing events from subscriptions with pull delivery. The EventGridConsumerClient provides operations for receiving, acknowledging, releasing, rejecting, and renewing locks on events.

Capabilities

Consumer Client Creation

Creates a consumer client for receiving events from Event Grid Namespace subscriptions.

class EventGridConsumerClient:
    def __init__(
        self,
        endpoint: str,
        credential: Union[AzureKeyCredential, TokenCredential],
        *,
        namespace_topic: str,
        subscription: str,
        api_version: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """
        Create EventGrid consumer client.

        Parameters:
        - endpoint: Event Grid namespace endpoint URL
        - credential: Authentication credential
        - namespace_topic: Topic name to consume from
        - subscription: Subscription name to consume from
        - api_version: API version (default: "2024-06-01")
        """

Event Reception

Receives a batch of Cloud Events from the subscription with configurable batch size and wait time.

def receive(
    self,
    *,
    max_events: Optional[int] = None,
    max_wait_time: Optional[int] = None,
    **kwargs: Any
) -> List[ReceiveDetails]:
    """
    Receive batch of Cloud Events from subscription.

    Parameters:
    - max_events: Maximum number of events to receive (1-100, service default applies)
    - max_wait_time: Maximum wait time in seconds (1-300, service default applies)

    Returns:
    List[ReceiveDetails]: List of received events with broker properties

    Raises:
    - HttpResponseError: Server returned error response
    - ClientAuthenticationError: Authentication failed
    """

Event Acknowledgment

Acknowledges successfully processed events, removing them from the subscription.

def acknowledge(
    self,
    *,
    lock_tokens: List[str],
    **kwargs: Any
) -> AcknowledgeResult:
    """
    Acknowledge successfully processed events.

    Parameters:
    - lock_tokens: List of lock tokens from received events

    Returns:
    AcknowledgeResult: Result with succeeded and failed acknowledgments

    Raises:
    - HttpResponseError: Server returned error response
    - ValueError: Invalid or expired lock tokens
    """

Event Release

Releases events back to the subscription for reprocessing, optionally with a delay.

def release(
    self,
    *,
    lock_tokens: List[str],
    release_delay: Optional[Union[int, ReleaseDelay]] = None,
    **kwargs: Any
) -> ReleaseResult:
    """
    Release events back to subscription for reprocessing.

    Parameters:
    - lock_tokens: List of lock tokens from received events
    - release_delay: Delay before event becomes available (ReleaseDelay enum or seconds as int)

    Returns:
    ReleaseResult: Result with succeeded and failed releases

    Raises:
    - HttpResponseError: Server returned error response
    - ValueError: Invalid lock tokens or release delay
    """

Event Rejection

Rejects events that cannot be processed, typically moving them to dead letter storage.

def reject(
    self,
    *,
    lock_tokens: List[str],
    **kwargs: Any
) -> RejectResult:
    """
    Reject events that cannot be processed.

    Parameters:
    - lock_tokens: List of lock tokens from received events

    Returns:
    RejectResult: Result with succeeded and failed rejections

    Raises:
    - HttpResponseError: Server returned error response
    - ValueError: Invalid or expired lock tokens
    """

Lock Renewal

Extends the lock duration on events to continue processing beyond the initial lock timeout.

def renew_locks(
    self,
    *,
    lock_tokens: List[str],
    **kwargs: Any
) -> RenewLocksResult:
    """
    Renew locks on events to extend processing time.

    Parameters:
    - lock_tokens: List of lock tokens from received events

    Returns:
    RenewLocksResult: Result with succeeded and failed lock renewals

    Raises:
    - HttpResponseError: Server returned error response
    - ValueError: Invalid or expired lock tokens
    """

Low-Level HTTP Operations

Direct HTTP request handling for advanced scenarios.

def send_request(
    self,
    request: HttpRequest,
    *,
    stream: bool = False,
    **kwargs: Any
) -> HttpResponse:
    """
    Send raw HTTP request through the client pipeline.

    Parameters:
    - request: The HTTP request to send
    - stream: Whether to stream the response payload

    Returns:
    HttpResponse: Raw HTTP response
    """

Resource Management

Context manager support and explicit resource cleanup.

def close(self) -> None:
    """Close the client and cleanup resources."""

def __enter__(self) -> Self:
    """Context manager entry."""

def __exit__(self, *exc_details: Any) -> None:
    """Context manager exit with cleanup."""

Usage Examples

Basic Event Consumption

from azure.eventgrid import EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential

# Create consumer client
consumer = EventGridConsumerClient(
    endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
    credential=AzureKeyCredential("access_key"),
    namespace_topic="orders-topic",
    subscription="order-processor"
)

# Receive events
events = consumer.receive(max_events=10, max_wait_time=60)

for event_detail in events:
    cloud_event = event_detail.event
    broker_props = event_detail.broker_properties
    
    print(f"Event ID: {cloud_event.id}")
    print(f"Event Type: {cloud_event.type}")
    print(f"Event Source: {cloud_event.source}")
    print(f"Event Data: {cloud_event.data}")
    print(f"Lock Token: {broker_props.lock_token}")
    print(f"Delivery Count: {broker_props.delivery_count}")

consumer.close()

Event Processing with Acknowledgment

# Process and acknowledge events
events = consumer.receive(max_events=5)
processed_tokens = []
failed_tokens = []

for event_detail in events:
    try:
        # Process the event
        result = process_order_event(event_detail.event)
        if result.success:
            processed_tokens.append(event_detail.broker_properties.lock_token)
        else:
            failed_tokens.append(event_detail.broker_properties.lock_token)
    except Exception as e:
        print(f"Processing failed: {e}")
        failed_tokens.append(event_detail.broker_properties.lock_token)

# Acknowledge successfully processed events
if processed_tokens:
    ack_result = consumer.acknowledge(lock_tokens=processed_tokens)
    print(f"Acknowledged: {len(ack_result.succeeded_lock_tokens)}")
    print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")

# Release failed events for retry
if failed_tokens:
    release_result = consumer.release(
        lock_tokens=failed_tokens,
        release_delay=ReleaseDelay.TEN_SECONDS
    )
    print(f"Released: {len(release_result.succeeded_lock_tokens)}")

Error Handling and Retries

from azure.eventgrid.models import ReleaseDelay
from azure.core.exceptions import HttpResponseError

events = consumer.receive(max_events=10)

for event_detail in events:
    cloud_event = event_detail.event
    lock_token = event_detail.broker_properties.lock_token
    delivery_count = event_detail.broker_properties.delivery_count
    
    try:
        # Attempt processing
        process_event(cloud_event)
        
        # Acknowledge successful processing
        consumer.acknowledge(lock_tokens=[lock_token])
        
    except ProcessingError as e:
        if delivery_count < 3:
            # Retry with delay
            consumer.release(
                lock_tokens=[lock_token],
                release_delay=ReleaseDelay.ONE_MINUTE
            )
            print(f"Released for retry (attempt {delivery_count + 1})")
        else:
            # Max retries exceeded, reject to dead letter
            consumer.reject(lock_tokens=[lock_token])
            print(f"Rejected after {delivery_count} attempts")
            
    except Exception as e:
        # Unexpected error, reject immediately
        consumer.reject(lock_tokens=[lock_token])
        print(f"Rejected due to unexpected error: {e}")

Long-Running Processing with Lock Renewal

import time
from azure.eventgrid.models import ReleaseDelay

events = consumer.receive(max_events=1)

for event_detail in events:
    lock_token = event_detail.broker_properties.lock_token
    
    try:
        # Start long-running processing
        for step in range(10):  # Simulate 10 processing steps
            # Renew lock every 30 seconds to prevent timeout
            if step > 0 and step % 3 == 0:
                renew_result = consumer.renew_locks(lock_tokens=[lock_token])
                if not renew_result.succeeded_lock_tokens:
                    print("Failed to renew lock, releasing event")
                    consumer.release(lock_tokens=[lock_token])
                    break
                    
            # Process step (simulate 10 seconds per step)
            time.sleep(10)
            print(f"Completed step {step + 1}")
        else:
            # All steps completed successfully
            consumer.acknowledge(lock_tokens=[lock_token])
            print("Processing completed successfully")
            
    except Exception as e:
        # Processing failed, release for retry
        consumer.release(
            lock_tokens=[lock_token],
            release_delay=ReleaseDelay.TEN_MINUTES
        )
        print(f"Processing failed, released for retry: {e}")

Context Manager Usage

# Automatic resource cleanup
with EventGridConsumerClient(
    endpoint="https://namespace.region.eventgrid.azure.net",
    credential=AzureKeyCredential("key"),
    namespace_topic="topic",
    subscription="sub"
) as consumer:
    
    events = consumer.receive(max_events=5)
    
    # Process events
    for event_detail in events:
        process_event(event_detail.event)
        
    # Acknowledge all events
    lock_tokens = [e.broker_properties.lock_token for e in events]
    consumer.acknowledge(lock_tokens=lock_tokens)
    
    # Client automatically closed on exit

Batch Processing Pattern

def process_event_batch(consumer, batch_size=10):
    """Process events in batches with proper error handling."""
    
    events = consumer.receive(max_events=batch_size, max_wait_time=30)
    
    if not events:
        print("No events received")
        return
        
    print(f"Received {len(events)} events")
    
    # Group events by processing outcome
    success_tokens = []
    retry_tokens = []
    reject_tokens = []
    
    for event_detail in events:
        try:
            # Process individual event
            result = process_single_event(event_detail.event)
            
            if result == "success":
                success_tokens.append(event_detail.broker_properties.lock_token)
            elif result == "retry":
                retry_tokens.append(event_detail.broker_properties.lock_token)
            else:
                reject_tokens.append(event_detail.broker_properties.lock_token)
                
        except Exception as e:
            print(f"Unexpected error processing event: {e}")
            reject_tokens.append(event_detail.broker_properties.lock_token)
    
    # Handle each group of events
    if success_tokens:
        ack_result = consumer.acknowledge(lock_tokens=success_tokens)
        print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")
        
    if retry_tokens:
        release_result = consumer.release(
            lock_tokens=retry_tokens,
            release_delay=ReleaseDelay.ONE_MINUTE
        )
        print(f"Released {len(release_result.succeeded_lock_tokens)} events for retry")
        
    if reject_tokens:
        reject_result = consumer.reject(lock_tokens=reject_tokens)
        print(f"Rejected {len(reject_result.succeeded_lock_tokens)} events")

# Usage
with EventGridConsumerClient(...) as consumer:
    while True:
        process_event_batch(consumer)
        time.sleep(5)  # Brief pause between batches

Error Handling

Common Error Scenarios

from azure.core.exceptions import HttpResponseError, ClientAuthenticationError

try:
    events = consumer.receive(max_events=10)
    # Process events...
    consumer.acknowledge(lock_tokens=lock_tokens)
    
except ClientAuthenticationError as e:
    print(f"Authentication failed: {e}")
    # Check credentials and permissions
    
except HttpResponseError as e:
    if e.status_code == 404:
        print("Topic or subscription not found")
    elif e.status_code == 400:
        print(f"Bad request: {e.message}")
        # Check lock tokens and parameters
    elif e.status_code == 409:
        print("Lock tokens expired or invalid")
        # Events may have been processed by another consumer
    else:
        print(f"HTTP error {e.status_code}: {e.message}")
        
except ValueError as e:
    print(f"Invalid parameters: {e}")
    # Check lock tokens format and values

Operation Result Handling

ack_result = consumer.acknowledge(lock_tokens=all_lock_tokens)

# Check for successful acknowledgments
if ack_result.succeeded_lock_tokens:
    print(f"Successfully acknowledged {len(ack_result.succeeded_lock_tokens)} events")

# Handle failed acknowledgments
if ack_result.failed_lock_tokens:
    print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")
    for failed_token in ack_result.failed_lock_tokens:
        print(f"Failed token: {failed_token.lock_token}")
        print(f"Error: {failed_token.error}")
        
        # Decide whether to retry or reject based on error
        if "expired" in str(failed_token.error):
            print("Lock expired, event likely processed elsewhere")
        else:
            # Unexpected error, may need manual intervention
            print("Unexpected acknowledgment error")

Install with Tessl CLI

npx tessl i tessl/pypi-azure-eventgrid

docs

async-operations.md

consumer.md

index.md

legacy.md

models.md

publisher.md

tile.json