Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Event Grid Python SDK provides full asynchronous support for both publisher and consumer operations. The async clients have identical APIs to their synchronous counterparts but use async/await for non-blocking operations.
Asynchronous event publishing for high-throughput scenarios and non-blocking operations.
class EventGridPublisherClient:
def __init__(
self,
endpoint: str,
credential: Union[AzureKeyCredential, AzureSasCredential, AsyncTokenCredential],
*,
namespace_topic: Optional[str] = None,
api_version: Optional[str] = None,
**kwargs: Any
) -> None: ...
async def send(
self,
events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
*,
channel_name: Optional[str] = None,
content_type: Optional[str] = None,
**kwargs: Any
) -> None:
"""
Asynchronously send events to Event Grid.
Parameters:
- events: Single event or list of events to send
- channel_name: Channel name for multi-channel publishing (namespaces only)
- content_type: Override content type for the request
"""
async def send_request(
self,
request: HttpRequest,
*,
stream: bool = False,
**kwargs: Any
) -> AsyncHttpResponse: ...
async def close(self) -> None:
"""Asynchronously close the client and cleanup resources."""
async def __aenter__(self) -> Self: ...
async def __aexit__(self, *exc_details: Any) -> None: ...Asynchronous event consumption with all management operations for Event Grid Namespaces.
class EventGridConsumerClient:
def __init__(
self,
endpoint: str,
credential: Union[AzureKeyCredential, AsyncTokenCredential],
*,
namespace_topic: str,
subscription: str,
api_version: Optional[str] = None,
**kwargs: Any
) -> None: ...
async def receive(
self,
*,
max_events: Optional[int] = None,
max_wait_time: Optional[int] = None,
**kwargs: Any
) -> List[ReceiveDetails]:
"""
Asynchronously receive batch of Cloud Events from subscription.
Parameters:
- max_events: Maximum number of events to receive
- max_wait_time: Maximum wait time in seconds
Returns:
List[ReceiveDetails]: List of received events with broker properties
"""
async def acknowledge(
self,
*,
lock_tokens: List[str],
**kwargs: Any
) -> AcknowledgeResult:
"""Asynchronously acknowledge successfully processed events."""
async def release(
self,
*,
lock_tokens: List[str],
release_delay: Optional[Union[str, ReleaseDelay]] = None,
**kwargs: Any
) -> ReleaseResult:
"""Asynchronously release events back to subscription for reprocessing."""
async def reject(
self,
*,
lock_tokens: List[str],
**kwargs: Any
) -> RejectResult:
"""Asynchronously reject events that cannot be processed."""
async def renew_locks(
self,
*,
lock_tokens: List[str],
**kwargs: Any
) -> RenewLocksResult:
"""Asynchronously renew locks on events to extend processing time."""
async def send_request(
self,
request: HttpRequest,
*,
stream: bool = False,
**kwargs: Any
) -> AsyncHttpResponse: ...
async def close(self) -> None: ...
async def __aenter__(self) -> Self: ...
async def __aexit__(self, *exc_details: Any) -> None: ...import asyncio
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
async def publish_events():
"""Asynchronously publish events to Event Grid Namespace."""
# Create async publisher client
async with EventGridPublisherClient(
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="orders-topic"
) as publisher:
# Create events
events = [
CloudEvent(
source="orders-service",
type="Order.Created",
data={"order_id": f"order-{i}", "total": i * 10.99}
)
for i in range(1, 6)
]
# Send events asynchronously
await publisher.send(events)
print(f"Published {len(events)} events")
# Run the async function
asyncio.run(publish_events())import asyncio
from azure.eventgrid.aio import EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid.models import ReleaseDelay
async def consume_events():
"""Asynchronously consume and process events."""
async with EventGridConsumerClient(
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="orders-topic",
subscription="order-processor"
) as consumer:
# Receive events
events = await consumer.receive(max_events=10, max_wait_time=30)
if not events:
print("No events received")
return
print(f"Received {len(events)} events")
# Process events concurrently
tasks = []
for event_detail in events:
task = process_event_async(event_detail)
tasks.append(task)
# Wait for all processing to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Group results by outcome
success_tokens = []
failed_tokens = []
for i, result in enumerate(results):
lock_token = events[i].broker_properties.lock_token
if isinstance(result, Exception):
print(f"Processing failed: {result}")
failed_tokens.append(lock_token)
elif result:
success_tokens.append(lock_token)
else:
failed_tokens.append(lock_token)
# Acknowledge successful events
if success_tokens:
ack_result = await consumer.acknowledge(lock_tokens=success_tokens)
print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")
# Release failed events
if failed_tokens:
release_result = await consumer.release(
lock_tokens=failed_tokens,
release_delay=ReleaseDelay.ONE_MINUTE
)
print(f"Released {len(release_result.succeeded_lock_tokens)} events")
async def process_event_async(event_detail):
"""Simulate async event processing."""
try:
# Simulate async work
await asyncio.sleep(0.1)
# Process the event
cloud_event = event_detail.event
print(f"Processing event: {cloud_event.type} from {cloud_event.source}")
# Simulate processing logic
if "error" in str(cloud_event.data).lower():
return False # Processing failed
return True # Processing succeeded
except Exception as e:
print(f"Unexpected error: {e}")
return False
# Run the consumer
asyncio.run(consume_events())import asyncio
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
async def publish_to_multiple_topics():
"""Publish events to multiple topics concurrently."""
credential = AzureKeyCredential("access_key")
endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"
# Create multiple publisher clients
publishers = [
EventGridPublisherClient(endpoint, credential, namespace_topic="topic1"),
EventGridPublisherClient(endpoint, credential, namespace_topic="topic2"),
EventGridPublisherClient(endpoint, credential, namespace_topic="topic3")
]
try:
# Create events for each topic
events_per_topic = [
[CloudEvent(source="app", type="Type1", data={"topic": "topic1", "id": i}) for i in range(5)],
[CloudEvent(source="app", type="Type2", data={"topic": "topic2", "id": i}) for i in range(5)],
[CloudEvent(source="app", type="Type3", data={"topic": "topic3", "id": i}) for i in range(5)]
]
# Publish to all topics concurrently
tasks = []
for publisher, events in zip(publishers, events_per_topic):
task = publisher.send(events)
tasks.append(task)
# Wait for all publishing to complete
await asyncio.gather(*tasks)
print("Published events to all topics")
finally:
# Close all publishers
close_tasks = [publisher.close() for publisher in publishers]
await asyncio.gather(*close_tasks)
asyncio.run(publish_to_multiple_topics())import asyncio
from azure.eventgrid.aio import EventGridConsumerClient, EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
class EventProcessor:
"""Async event processing pipeline."""
def __init__(self, endpoint, credential):
self.endpoint = endpoint
self.credential = credential
async def run_pipeline(self):
"""Run continuous event processing pipeline."""
# Input consumer
input_consumer = EventGridConsumerClient(
self.endpoint, self.credential,
namespace_topic="input-topic",
subscription="processor"
)
# Output publisher
output_publisher = EventGridPublisherClient(
self.endpoint, self.credential,
namespace_topic="output-topic"
)
async with input_consumer, output_publisher:
while True:
try:
# Receive events
events = await input_consumer.receive(max_events=5, max_wait_time=10)
if not events:
await asyncio.sleep(1)
continue
# Process events concurrently
processed_events = await self.process_events(events)
# Publish processed events
if processed_events:
await output_publisher.send(processed_events)
# Acknowledge input events
lock_tokens = [e.broker_properties.lock_token for e in events]
await input_consumer.acknowledge(lock_tokens=lock_tokens)
print(f"Processed {len(events)} events")
except KeyboardInterrupt:
print("Shutting down pipeline...")
break
except Exception as e:
print(f"Pipeline error: {e}")
await asyncio.sleep(5) # Brief pause before retry
async def process_events(self, events):
"""Process events concurrently."""
tasks = [self.transform_event(event_detail) for event_detail in events]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out failed transformations
processed_events = []
for result in results:
if not isinstance(result, Exception) and result:
processed_events.append(result)
return processed_events
async def transform_event(self, event_detail):
"""Transform a single event."""
try:
# Simulate async transformation
await asyncio.sleep(0.1)
input_event = event_detail.event
# Create transformed event
output_event = CloudEvent(
source="event-processor",
type=f"Processed.{input_event.type}",
data={
"original_data": input_event.data,
"processed_at": asyncio.get_event_loop().time(),
"processor_id": "async-processor-1"
}
)
return output_event
except Exception as e:
print(f"Transformation failed: {e}")
return None
# Run the pipeline
processor = EventProcessor(
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
credential=AzureKeyCredential("access_key")
)
asyncio.run(processor.run_pipeline())import asyncio
from azure.eventgrid.aio import EventGridPublisherClient, EventGridConsumerClient
async def context_manager_examples():
"""Demonstrate async context manager usage patterns."""
credential = AzureKeyCredential("access_key")
endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"
# Single client context manager
async with EventGridPublisherClient(
endpoint, credential, namespace_topic="topic"
) as publisher:
await publisher.send([CloudEvent(source="app", type="Test", data={})])
# Multiple clients with AsyncExitStack
from contextlib import AsyncExitStack
async with AsyncExitStack() as stack:
# Enter multiple async context managers
publisher = await stack.enter_async_context(
EventGridPublisherClient(endpoint, credential, namespace_topic="output")
)
consumer = await stack.enter_async_context(
EventGridConsumerClient(
endpoint, credential,
namespace_topic="input",
subscription="processor"
)
)
# Use both clients
events = await consumer.receive(max_events=5)
if events:
# Process and forward events
processed = [transform_event(e) for e in events]
await publisher.send(processed)
# Acknowledge input events
tokens = [e.broker_properties.lock_token for e in events]
await consumer.acknowledge(lock_tokens=tokens)
# All clients automatically closed on exit
def transform_event(event_detail):
"""Simple event transformation."""
return CloudEvent(
source="transformer",
type=f"Transformed.{event_detail.event.type}",
data={"original": event_detail.event.data}
)
asyncio.run(context_manager_examples())import asyncio
from azure.core.exceptions import HttpResponseError, ClientAuthenticationError
async def robust_async_processing():
"""Demonstrate robust error handling in async operations."""
async with EventGridConsumerClient(...) as consumer:
while True:
try:
# Receive events with timeout
events = await asyncio.wait_for(
consumer.receive(max_events=10),
timeout=30.0
)
if not events:
continue
# Process with individual error handling
success_tokens = []
retry_tokens = []
for event_detail in events:
try:
# Process with timeout
result = await asyncio.wait_for(
process_event_async(event_detail.event),
timeout=5.0
)
if result:
success_tokens.append(event_detail.broker_properties.lock_token)
else:
retry_tokens.append(event_detail.broker_properties.lock_token)
except asyncio.TimeoutError:
print("Event processing timed out")
retry_tokens.append(event_detail.broker_properties.lock_token)
except Exception as e:
print(f"Event processing failed: {e}")
retry_tokens.append(event_detail.broker_properties.lock_token)
# Handle results concurrently
tasks = []
if success_tokens:
tasks.append(consumer.acknowledge(lock_tokens=success_tokens))
if retry_tokens:
tasks.append(consumer.release(
lock_tokens=retry_tokens,
release_delay=ReleaseDelay.ONE_MINUTE
))
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Operation failed: {result}")
except asyncio.TimeoutError:
print("Receive operation timed out, continuing...")
except ClientAuthenticationError as e:
print(f"Authentication failed: {e}")
break # Cannot continue without valid auth
except HttpResponseError as e:
print(f"HTTP error: {e.status_code} - {e.message}")
if e.status_code >= 500:
# Server error, wait and retry
await asyncio.sleep(5)
else:
# Client error, may need intervention
break
except Exception as e:
print(f"Unexpected error: {e}")
await asyncio.sleep(1)
asyncio.run(robust_async_processing())import asyncio
from asyncio import Semaphore
async def controlled_concurrent_processing(consumer, max_concurrent=10):
"""Process events with controlled concurrency."""
semaphore = Semaphore(max_concurrent)
async def process_with_semaphore(event_detail):
async with semaphore:
return await process_event_async(event_detail)
events = await consumer.receive(max_events=50)
# Process with limited concurrency
tasks = [process_with_semaphore(event) for event in events]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results...async def optimized_batch_processing(consumer, batch_size=20):
"""Optimized async batch processing."""
while True:
# Receive larger batches
events = await consumer.receive(max_events=batch_size, max_wait_time=10)
if not events:
await asyncio.sleep(0.1) # Brief pause
continue
# Process in smaller concurrent chunks
chunk_size = 5
for i in range(0, len(events), chunk_size):
chunk = events[i:i + chunk_size]
# Process chunk concurrently
tasks = [process_event_async(event) for event in chunk]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle chunk results immediately
success_tokens = []
failed_tokens = []
for j, result in enumerate(results):
lock_token = chunk[j].broker_properties.lock_token
if isinstance(result, Exception) or not result:
failed_tokens.append(lock_token)
else:
success_tokens.append(lock_token)
# Process results concurrently
ops = []
if success_tokens:
ops.append(consumer.acknowledge(lock_tokens=success_tokens))
if failed_tokens:
ops.append(consumer.release(lock_tokens=failed_tokens))
if ops:
await asyncio.gather(*ops, return_exceptions=True)Install with Tessl CLI
npx tessl i tessl/pypi-azure-eventgrid