Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Event publishing in Azure Event Grid enables sending events to both Event Grid Basic topics/domains and Event Grid Namespace topics. The EventGridPublisherClient provides a unified interface that automatically detects the target environment and handles appropriate event formatting.
Creates a publisher client for sending events to Event Grid endpoints.
class EventGridPublisherClient:
def __init__(
self,
endpoint: str,
credential: Union[AzureKeyCredential, AzureSasCredential, TokenCredential],
*,
namespace_topic: Optional[str] = None,
api_version: Optional[str] = None,
**kwargs: Any
) -> None:
"""
Create EventGrid publisher client.
Parameters:
- endpoint: Event Grid endpoint URL
- credential: Authentication credential
- namespace_topic: Topic name for namespaces (None for Event Grid Basic)
- api_version: API version (defaults: "2024-06-01" for namespaces, "2018-01-01" for basic)
"""Sends events to Event Grid with automatic content type detection and schema validation.
def send(
self,
events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
*,
channel_name: Optional[str] = None,
content_type: Optional[str] = None,
**kwargs: Any
) -> None:
"""
Send events to Event Grid.
Parameters:
- events: Single event or list of events to send
- channel_name: Channel name for multi-channel publishing (namespaces only)
- content_type: Override content type for the request
Raises:
- HttpResponseError: Server returned error response
- ClientAuthenticationError: Authentication failed
- ValueError: Invalid event format or missing required fields
"""Direct HTTP request handling for advanced scenarios and custom event formats.
def send_request(
self,
request: HttpRequest,
*,
stream: bool = False,
**kwargs: Any
) -> HttpResponse:
"""
Send raw HTTP request through the client pipeline.
Parameters:
- request: The HTTP request to send
- stream: Whether to stream the response payload
Returns:
HttpResponse: Raw HTTP response
"""Context manager support and explicit resource cleanup.
def close(self) -> None:
"""Close the client and cleanup resources."""
def __enter__(self) -> Self:
"""Context manager entry."""
def __exit__(self, *exc_details: Any) -> None:
"""Context manager exit with cleanup."""from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
import json
# Create publisher for namespace topic
publisher = EventGridPublisherClient(
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="my-topic"
)
# Send CloudEvent
cloud_event = CloudEvent(
source="myapp/orders",
type="Order.Created",
data={
"order_id": "12345",
"customer_id": "customer-456",
"total": 99.99
}
)
publisher.send(cloud_event)
# Send multiple events
events = [
CloudEvent(source="app", type="Event.Type1", data={"key": "value1"}),
CloudEvent(source="app", type="Event.Type2", data={"key": "value2"})
]
publisher.send(events)
# Send raw dictionary (converted to CloudEvent automatically)
raw_event = {
"specversion": "1.0",
"type": "Custom.Event",
"source": "myapp",
"id": "event-123",
"data": {"message": "Hello World"}
}
publisher.send(raw_event)
# Channel-specific publishing (if namespace supports channels)
publisher.send(cloud_event, channel_name="orders-channel")
publisher.close()from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
# Create publisher for basic topic (no namespace_topic specified)
publisher = EventGridPublisherClient(
endpoint="https://my-topic.westus.eventgrid.azure.net/api/events",
credential=AzureKeyCredential("access_key")
)
# Send EventGrid schema event
event = EventGridEvent(
subject="orders/order-123",
event_type="Order.Completed",
data={
"order_id": "123",
"status": "completed",
"total": 49.99
},
data_version="1.0"
)
publisher.send(event)
# Send multiple EventGrid events
events = [
EventGridEvent(subject="orders/1", event_type="Order.Created", data={}, data_version="1.0"),
EventGridEvent(subject="orders/2", event_type="Order.Updated", data={}, data_version="1.0")
]
publisher.send(events)
publisher.close()# Automatic resource cleanup
with EventGridPublisherClient(endpoint, credential, namespace_topic="topic") as publisher:
publisher.send(events)
# Client automatically closed on exitfrom azure.eventgrid import generate_sas
from azure.core.credentials import AzureSasCredential
from datetime import datetime, timedelta
# Generate SAS token
endpoint = "https://my-topic.westus.eventgrid.azure.net/api/events"
access_key = "your_access_key"
expiration = datetime.utcnow() + timedelta(hours=1)
sas_token = generate_sas(endpoint, access_key, expiration)
# Use SAS token for authentication
publisher = EventGridPublisherClient(
endpoint=endpoint,
credential=AzureSasCredential(sas_token)
)from azure.core.exceptions import HttpResponseError, ClientAuthenticationError
try:
publisher.send(events)
except ClientAuthenticationError as e:
print(f"Authentication failed: {e}")
except HttpResponseError as e:
print(f"HTTP error {e.status_code}: {e.message}")
if e.status_code == 413:
print("Event batch too large, try sending fewer events")
elif e.status_code == 400:
print("Invalid event format or missing required fields")
except ValueError as e:
print(f"Invalid event data: {e}")CloudEvents v1.0 specification with required and optional fields:
# Required fields
cloud_event = CloudEvent(
source="myapp/component", # Context in which event occurred
type="MyApp.Event.Happened", # Event type identifier
data={"key": "value"} # Event payload
)
# With optional fields
cloud_event = CloudEvent(
source="myapp/orders",
type="Order.StatusChanged",
data={
"order_id": "12345",
"old_status": "pending",
"new_status": "confirmed"
},
subject="orders/12345", # Event subject
time=datetime.utcnow(), # Event timestamp
datacontenttype="application/json" # Content type of data
)Traditional EventGrid event format:
eventgrid_event = EventGridEvent(
subject="products/product-123", # Required: Event subject
event_type="Product.PriceChanged", # Required: Event type
data={ # Required: Event data
"product_id": "123",
"old_price": 19.99,
"new_price": 24.99
},
data_version="2.0", # Required: Data schema version
topic="products", # Optional: Topic (auto-set by service)
id="event-456", # Optional: Event ID (auto-generated)
event_time=datetime.utcnow() # Optional: Event time (auto-generated)
)Dictionaries are automatically converted based on target:
# For namespaces (converted to CloudEvent)
namespace_dict = {
"specversion": "1.0", # Required for CloudEvent
"type": "MyApp.Event", # Required
"source": "myapp", # Required
"data": {"message": "hello"} # Event payload
}
# For basic topics (converted to EventGrid event)
basic_dict = {
"eventType": "Custom.Event", # Maps to event_type
"subject": "app/component", # Maps to subject
"data": {"key": "value"}, # Event data
"dataVersion": "1.0" # Maps to data_version
}Install with Tessl CLI
npx tessl i tessl/pypi-azure-eventgrid