Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Event consumption in Azure Event Grid Namespaces enables receiving and managing events from subscriptions with pull delivery. The EventGridConsumerClient provides operations for receiving, acknowledging, releasing, rejecting, and renewing locks on events.
Creates a consumer client for receiving events from Event Grid Namespace subscriptions.
class EventGridConsumerClient:
def __init__(
self,
endpoint: str,
credential: Union[AzureKeyCredential, TokenCredential],
*,
namespace_topic: str,
subscription: str,
api_version: Optional[str] = None,
**kwargs: Any
) -> None:
"""
Create EventGrid consumer client.
Parameters:
- endpoint: Event Grid namespace endpoint URL
- credential: Authentication credential
- namespace_topic: Topic name to consume from
- subscription: Subscription name to consume from
- api_version: API version (default: "2024-06-01")
"""Receives a batch of Cloud Events from the subscription with configurable batch size and wait time.
def receive(
self,
*,
max_events: Optional[int] = None,
max_wait_time: Optional[int] = None,
**kwargs: Any
) -> List[ReceiveDetails]:
"""
Receive batch of Cloud Events from subscription.
Parameters:
- max_events: Maximum number of events to receive (1-100, service default applies)
- max_wait_time: Maximum wait time in seconds (1-300, service default applies)
Returns:
List[ReceiveDetails]: List of received events with broker properties
Raises:
- HttpResponseError: Server returned error response
- ClientAuthenticationError: Authentication failed
"""Acknowledges successfully processed events, removing them from the subscription.
def acknowledge(
self,
*,
lock_tokens: List[str],
**kwargs: Any
) -> AcknowledgeResult:
"""
Acknowledge successfully processed events.
Parameters:
- lock_tokens: List of lock tokens from received events
Returns:
AcknowledgeResult: Result with succeeded and failed acknowledgments
Raises:
- HttpResponseError: Server returned error response
- ValueError: Invalid or expired lock tokens
"""Releases events back to the subscription for reprocessing, optionally with a delay.
def release(
self,
*,
lock_tokens: List[str],
release_delay: Optional[Union[int, ReleaseDelay]] = None,
**kwargs: Any
) -> ReleaseResult:
"""
Release events back to subscription for reprocessing.
Parameters:
- lock_tokens: List of lock tokens from received events
- release_delay: Delay before event becomes available (ReleaseDelay enum or seconds as int)
Returns:
ReleaseResult: Result with succeeded and failed releases
Raises:
- HttpResponseError: Server returned error response
- ValueError: Invalid lock tokens or release delay
"""Rejects events that cannot be processed, typically moving them to dead letter storage.
def reject(
self,
*,
lock_tokens: List[str],
**kwargs: Any
) -> RejectResult:
"""
Reject events that cannot be processed.
Parameters:
- lock_tokens: List of lock tokens from received events
Returns:
RejectResult: Result with succeeded and failed rejections
Raises:
- HttpResponseError: Server returned error response
- ValueError: Invalid or expired lock tokens
"""Extends the lock duration on events to continue processing beyond the initial lock timeout.
def renew_locks(
self,
*,
lock_tokens: List[str],
**kwargs: Any
) -> RenewLocksResult:
"""
Renew locks on events to extend processing time.
Parameters:
- lock_tokens: List of lock tokens from received events
Returns:
RenewLocksResult: Result with succeeded and failed lock renewals
Raises:
- HttpResponseError: Server returned error response
- ValueError: Invalid or expired lock tokens
"""Direct HTTP request handling for advanced scenarios.
def send_request(
self,
request: HttpRequest,
*,
stream: bool = False,
**kwargs: Any
) -> HttpResponse:
"""
Send raw HTTP request through the client pipeline.
Parameters:
- request: The HTTP request to send
- stream: Whether to stream the response payload
Returns:
HttpResponse: Raw HTTP response
"""Context manager support and explicit resource cleanup.
def close(self) -> None:
"""Close the client and cleanup resources."""
def __enter__(self) -> Self:
"""Context manager entry."""
def __exit__(self, *exc_details: Any) -> None:
"""Context manager exit with cleanup."""from azure.eventgrid import EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential
# Create consumer client
consumer = EventGridConsumerClient(
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="orders-topic",
subscription="order-processor"
)
# Receive events
events = consumer.receive(max_events=10, max_wait_time=60)
for event_detail in events:
cloud_event = event_detail.event
broker_props = event_detail.broker_properties
print(f"Event ID: {cloud_event.id}")
print(f"Event Type: {cloud_event.type}")
print(f"Event Source: {cloud_event.source}")
print(f"Event Data: {cloud_event.data}")
print(f"Lock Token: {broker_props.lock_token}")
print(f"Delivery Count: {broker_props.delivery_count}")
consumer.close()# Process and acknowledge events
events = consumer.receive(max_events=5)
processed_tokens = []
failed_tokens = []
for event_detail in events:
try:
# Process the event
result = process_order_event(event_detail.event)
if result.success:
processed_tokens.append(event_detail.broker_properties.lock_token)
else:
failed_tokens.append(event_detail.broker_properties.lock_token)
except Exception as e:
print(f"Processing failed: {e}")
failed_tokens.append(event_detail.broker_properties.lock_token)
# Acknowledge successfully processed events
if processed_tokens:
ack_result = consumer.acknowledge(lock_tokens=processed_tokens)
print(f"Acknowledged: {len(ack_result.succeeded_lock_tokens)}")
print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")
# Release failed events for retry
if failed_tokens:
release_result = consumer.release(
lock_tokens=failed_tokens,
release_delay=ReleaseDelay.TEN_SECONDS
)
print(f"Released: {len(release_result.succeeded_lock_tokens)}")from azure.eventgrid.models import ReleaseDelay
from azure.core.exceptions import HttpResponseError
events = consumer.receive(max_events=10)
for event_detail in events:
cloud_event = event_detail.event
lock_token = event_detail.broker_properties.lock_token
delivery_count = event_detail.broker_properties.delivery_count
try:
# Attempt processing
process_event(cloud_event)
# Acknowledge successful processing
consumer.acknowledge(lock_tokens=[lock_token])
except ProcessingError as e:
if delivery_count < 3:
# Retry with delay
consumer.release(
lock_tokens=[lock_token],
release_delay=ReleaseDelay.ONE_MINUTE
)
print(f"Released for retry (attempt {delivery_count + 1})")
else:
# Max retries exceeded, reject to dead letter
consumer.reject(lock_tokens=[lock_token])
print(f"Rejected after {delivery_count} attempts")
except Exception as e:
# Unexpected error, reject immediately
consumer.reject(lock_tokens=[lock_token])
print(f"Rejected due to unexpected error: {e}")import time
from azure.eventgrid.models import ReleaseDelay
events = consumer.receive(max_events=1)
for event_detail in events:
lock_token = event_detail.broker_properties.lock_token
try:
# Start long-running processing
for step in range(10): # Simulate 10 processing steps
# Renew lock every 30 seconds to prevent timeout
if step > 0 and step % 3 == 0:
renew_result = consumer.renew_locks(lock_tokens=[lock_token])
if not renew_result.succeeded_lock_tokens:
print("Failed to renew lock, releasing event")
consumer.release(lock_tokens=[lock_token])
break
# Process step (simulate 10 seconds per step)
time.sleep(10)
print(f"Completed step {step + 1}")
else:
# All steps completed successfully
consumer.acknowledge(lock_tokens=[lock_token])
print("Processing completed successfully")
except Exception as e:
# Processing failed, release for retry
consumer.release(
lock_tokens=[lock_token],
release_delay=ReleaseDelay.TEN_MINUTES
)
print(f"Processing failed, released for retry: {e}")# Automatic resource cleanup
with EventGridConsumerClient(
endpoint="https://namespace.region.eventgrid.azure.net",
credential=AzureKeyCredential("key"),
namespace_topic="topic",
subscription="sub"
) as consumer:
events = consumer.receive(max_events=5)
# Process events
for event_detail in events:
process_event(event_detail.event)
# Acknowledge all events
lock_tokens = [e.broker_properties.lock_token for e in events]
consumer.acknowledge(lock_tokens=lock_tokens)
# Client automatically closed on exitdef process_event_batch(consumer, batch_size=10):
"""Process events in batches with proper error handling."""
events = consumer.receive(max_events=batch_size, max_wait_time=30)
if not events:
print("No events received")
return
print(f"Received {len(events)} events")
# Group events by processing outcome
success_tokens = []
retry_tokens = []
reject_tokens = []
for event_detail in events:
try:
# Process individual event
result = process_single_event(event_detail.event)
if result == "success":
success_tokens.append(event_detail.broker_properties.lock_token)
elif result == "retry":
retry_tokens.append(event_detail.broker_properties.lock_token)
else:
reject_tokens.append(event_detail.broker_properties.lock_token)
except Exception as e:
print(f"Unexpected error processing event: {e}")
reject_tokens.append(event_detail.broker_properties.lock_token)
# Handle each group of events
if success_tokens:
ack_result = consumer.acknowledge(lock_tokens=success_tokens)
print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")
if retry_tokens:
release_result = consumer.release(
lock_tokens=retry_tokens,
release_delay=ReleaseDelay.ONE_MINUTE
)
print(f"Released {len(release_result.succeeded_lock_tokens)} events for retry")
if reject_tokens:
reject_result = consumer.reject(lock_tokens=reject_tokens)
print(f"Rejected {len(reject_result.succeeded_lock_tokens)} events")
# Usage
with EventGridConsumerClient(...) as consumer:
while True:
process_event_batch(consumer)
time.sleep(5) # Brief pause between batchesfrom azure.core.exceptions import HttpResponseError, ClientAuthenticationError
try:
events = consumer.receive(max_events=10)
# Process events...
consumer.acknowledge(lock_tokens=lock_tokens)
except ClientAuthenticationError as e:
print(f"Authentication failed: {e}")
# Check credentials and permissions
except HttpResponseError as e:
if e.status_code == 404:
print("Topic or subscription not found")
elif e.status_code == 400:
print(f"Bad request: {e.message}")
# Check lock tokens and parameters
elif e.status_code == 409:
print("Lock tokens expired or invalid")
# Events may have been processed by another consumer
else:
print(f"HTTP error {e.status_code}: {e.message}")
except ValueError as e:
print(f"Invalid parameters: {e}")
# Check lock tokens format and valuesack_result = consumer.acknowledge(lock_tokens=all_lock_tokens)
# Check for successful acknowledgments
if ack_result.succeeded_lock_tokens:
print(f"Successfully acknowledged {len(ack_result.succeeded_lock_tokens)} events")
# Handle failed acknowledgments
if ack_result.failed_lock_tokens:
print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")
for failed_token in ack_result.failed_lock_tokens:
print(f"Failed token: {failed_token.lock_token}")
print(f"Error: {failed_token.error}")
# Decide whether to retry or reject based on error
if "expired" in str(failed_token.error):
print("Lock expired, event likely processed elsewhere")
else:
# Unexpected error, may need manual intervention
print("Unexpected acknowledgment error")Install with Tessl CLI
npx tessl i tessl/pypi-azure-eventgrid