Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces
npx @tessl/cli install tessl/pypi-azure-eventgrid@4.22.0Microsoft Azure Event Grid Client Library for Python provides comprehensive support for both Event Grid Basic (legacy) and Event Grid Namespaces. This library enables event-driven applications to publish events to Event Grid topics and consume events from Event Grid Namespace subscriptions using reliable, scalable event routing and processing capabilities.
pip install azure-eventgridfrom azure.eventgrid import EventGridPublisherClient, EventGridConsumerClientFor async operations:
from azure.eventgrid.aio import EventGridPublisherClient, EventGridConsumerClientFor legacy Event Grid Basic support:
from azure.eventgrid import EventGridEvent, SystemEventNames, generate_sasFor models and data types:
from azure.eventgrid.models import (
AcknowledgeResult, ReleaseResult, RejectResult, RenewLocksResult,
ReceiveDetails, BrokerProperties, ReleaseDelay
)from azure.eventgrid import EventGridPublisherClient, EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
# Publisher client for sending events
publisher = EventGridPublisherClient(
endpoint="https://namespace.region.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="my-topic"
)
# Create and send events
cloud_event = CloudEvent(
source="myapp/products",
type="Product.Created",
data={
"product_id": "12345",
"name": "New Product",
"price": 29.99
}
)
publisher.send([cloud_event])
publisher.close()
# Consumer client for receiving events
consumer = EventGridConsumerClient(
endpoint="https://namespace.region.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="my-topic",
subscription="my-subscription"
)
# Receive and process events
events = consumer.receive(max_events=10, max_wait_time=60)
for event_detail in events:
print(f"Event: {event_detail.event.type}")
print(f"Data: {event_detail.event.data}")
# Process the event, then acknowledge
# consumer.acknowledge(lock_tokens=[event_detail.broker_properties.lock_token])
consumer.close()from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
# Publisher client for Event Grid Basic
publisher = EventGridPublisherClient(
endpoint="https://my-topic.region.eventgrid.azure.net/api/events",
credential=AzureKeyCredential("access_key")
# No namespace_topic specified for Basic
)
# Create EventGrid schema event
event = EventGridEvent(
subject="products/product-123",
event_type="Product.Created",
data={
"product_id": "123",
"name": "Example Product"
},
data_version="1.0"
)
publisher.send([event])
publisher.close()Azure Event Grid supports two distinct models:
The Python SDK provides unified client APIs for both models:
Send events to Event Grid Basic topics/domains or Event Grid Namespace topics. Supports CloudEvent schema (namespaces), EventGrid schema (basic), and raw dictionaries with automatic content type detection.
def send(
events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
*,
channel_name: Optional[str] = None,
content_type: Optional[str] = None,
**kwargs
) -> None: ...Receive and manage events from Event Grid Namespace subscriptions with operations for acknowledging, releasing, rejecting, and renewing locks on events.
def receive(
*,
max_events: Optional[int] = None,
max_wait_time: Optional[int] = None,
**kwargs
) -> List[ReceiveDetails]: ...
def acknowledge(*, lock_tokens: List[str], **kwargs) -> AcknowledgeResult: ...
def release(*, lock_tokens: List[str], release_delay: Optional[Union[int, ReleaseDelay]] = None, **kwargs) -> ReleaseResult: ...
def reject(*, lock_tokens: List[str], **kwargs) -> RejectResult: ...
def renew_locks(*, lock_tokens: List[str], **kwargs) -> RenewLocksResult: ...Asynchronous versions of all publisher and consumer operations for non-blocking event processing and high-throughput scenarios.
async def send(
events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
*,
channel_name: Optional[str] = None,
content_type: Optional[str] = None,
**kwargs
) -> None: ...
async def receive(
*,
max_events: Optional[int] = None,
max_wait_time: Optional[int] = None,
**kwargs
) -> List[ReceiveDetails]: ...Response models, event metadata, broker properties, and enums for comprehensive event handling and status reporting.
class ReceiveDetails:
broker_properties: BrokerProperties
event: CloudEvent
class BrokerProperties:
lock_token: str
delivery_count: int
class AcknowledgeResult:
failed_lock_tokens: List[FailedLockToken]
succeeded_lock_tokens: List[str]Event Grid Basic support with EventGrid schema events, system event names, and SAS token generation for backward compatibility.
class EventGridEvent:
def __init__(
self,
subject: str,
event_type: str,
data: Any,
data_version: str,
*,
topic: Optional[str] = None,
metadata_version: Optional[str] = None,
id: Optional[str] = None,
event_time: Optional[datetime] = None
) -> None: ...
def generate_sas(
endpoint: str,
shared_access_key: str,
expiration_date_utc: datetime,
*,
api_version: Optional[str] = None
) -> str: ...from typing import List, Optional, Union, Any
from datetime import datetime
from azure.core.credentials import AzureKeyCredential, AzureSasCredential, TokenCredential
from azure.core.credentials_async import AsyncTokenCredential
from azure.core.messaging import CloudEvent
from azure.core.rest import HttpRequest, HttpResponse
# Authentication types
Credential = Union[AzureKeyCredential, AzureSasCredential, TokenCredential]
AsyncCredential = Union[AzureKeyCredential, AsyncTokenCredential]
# Event types for publishing
EventType = Union[CloudEvent, EventGridEvent, dict]
EventList = List[EventType]
SendableEvents = Union[EventType, EventList]