Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Event Grid Python SDK provides comprehensive backward compatibility for Event Grid Basic (legacy) scenarios. This includes EventGrid schema events, system event names, SAS token generation, and legacy publisher clients.
Traditional EventGrid event format for Event Grid Basic topics and domains.
class EventGridEvent:
def __init__(
self,
subject: str,
event_type: str,
data: Any,
data_version: str,
*,
topic: Optional[str] = None,
metadata_version: Optional[str] = None,
id: Optional[str] = None,
event_time: Optional[datetime] = None
) -> None:
"""
Create EventGrid schema event.
Parameters:
- subject: Subject of the event in the context of the event producer (required)
- event_type: One of the registered event types for this event source (required)
- data: Event data specific to the resource provider (required)
- data_version: The schema version of the data object (required)
- topic: The resource path to the event source (optional, set by service)
- metadata_version: The schema version of the event metadata (optional)
- id: An identifier for the event (optional, auto-generated if not provided)
- event_time: The time the event was generated (optional, auto-generated if not provided)
"""
@classmethod
def from_json(cls, event: Union[str, dict]) -> "EventGridEvent":
"""
Create EventGridEvent from JSON string or dictionary.
Parameters:
- event: JSON string or dictionary representing the event
Returns:
EventGridEvent: Parsed event object
"""Comprehensive enumeration of Azure system event types for filtering and handling built-in Azure service events.
class SystemEventNames(str, Enum):
"""
Enumeration of Azure system event names.
Contains 100+ system event types including:
- EventGridSubscriptionValidationEventName
- ResourceWriteSuccessEventName
- ResourceWriteFailureEventName
- ResourceDeleteSuccessEventName
- IoTHubDeviceConnectedEventName
- IoTHubDeviceDisconnectedEventName
- StorageBlobCreatedEventName
- StorageBlobDeletedEventName
- ServiceBusActiveMessagesAvailableWithNoListenersEventName
- And many more...
"""
# Event Grid events
EventGridSubscriptionValidationEventName = "Microsoft.EventGrid.SubscriptionValidationEvent"
EventGridSubscriptionDeletedEventName = "Microsoft.EventGrid.SubscriptionDeletedEvent"
# Resource Manager events
ResourceWriteSuccessEventName = "Microsoft.Resources.ResourceWriteSuccess"
ResourceWriteFailureEventName = "Microsoft.Resources.ResourceWriteFailure"
ResourceWriteCancelEventName = "Microsoft.Resources.ResourceWriteCancel"
ResourceDeleteSuccessEventName = "Microsoft.Resources.ResourceDeleteSuccess"
ResourceDeleteFailureEventName = "Microsoft.Resources.ResourceDeleteFailure"
ResourceDeleteCancelEventName = "Microsoft.Resources.ResourceDeleteCancel"
# Storage events
StorageBlobCreatedEventName = "Microsoft.Storage.BlobCreated"
StorageBlobDeletedEventName = "Microsoft.Storage.BlobDeleted"
StorageBlobRenamedEventName = "Microsoft.Storage.BlobRenamed"
StorageDirectoryCreatedEventName = "Microsoft.Storage.DirectoryCreated"
StorageDirectoryDeletedEventName = "Microsoft.Storage.DirectoryDeleted"
StorageDirectoryRenamedEventName = "Microsoft.Storage.DirectoryRenamed"
# IoT Hub events
IoTHubDeviceConnectedEventName = "Microsoft.Devices.DeviceConnected"
IoTHubDeviceDisconnectedEventName = "Microsoft.Devices.DeviceDisconnected"
IoTHubDeviceCreatedEventName = "Microsoft.Devices.DeviceCreated"
IoTHubDeviceDeletedEventName = "Microsoft.Devices.DeviceDeleted"
IoTHubDeviceTelemetryEventName = "Microsoft.Devices.DeviceTelemetry"
# And 90+ more system event names...Generate Shared Access Signature tokens for Event Grid Basic authentication.
def generate_sas(
endpoint: str,
shared_access_key: str,
expiration_date_utc: datetime,
*,
api_version: Optional[str] = None
) -> str:
"""
Generate Shared Access Signature for Event Grid authentication.
Parameters:
- endpoint: Event Grid topic endpoint URL
- shared_access_key: Shared access key for the topic
- expiration_date_utc: UTC datetime when the token expires
- api_version: API version for the signature (optional)
Returns:
str: SAS token for authentication
Raises:
- ValueError: Invalid endpoint or key format
- TypeError: Invalid expiration date type
"""Asynchronous Event Grid Basic publisher for backward compatibility.
# Available in azure.eventgrid._legacy.aio
class EventGridPublisherClient:
def __init__(
self,
endpoint: str,
credential: Union[AzureKeyCredential, AzureSasCredential],
**kwargs: Any
) -> None: ...
async def send(
self,
events: Union[EventGridEvent, List[EventGridEvent]],
**kwargs: Any
) -> None:
"""
Asynchronously send EventGrid schema events to Event Grid Basic.
Parameters:
- events: Single event or list of EventGrid events
"""from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
from datetime import datetime
# Create publisher for Event Grid Basic topic
publisher = EventGridPublisherClient(
endpoint="https://my-topic.westus.eventgrid.azure.net/api/events",
credential=AzureKeyCredential("access_key")
# Note: No namespace_topic parameter for Event Grid Basic
)
# Create EventGrid schema event
event = EventGridEvent(
subject="orders/order-12345",
event_type="Contoso.Orders.OrderCreated",
data={
"order_id": "12345",
"customer_id": "customer-67890",
"product_ids": ["product-1", "product-2"],
"total_amount": 149.99,
"currency": "USD"
},
data_version="1.0"
)
# Send single event
publisher.send(event)
# Send multiple events
events = [
EventGridEvent(
subject="orders/order-12346",
event_type="Contoso.Orders.OrderCreated",
data={"order_id": "12346", "total": 99.99},
data_version="1.0"
),
EventGridEvent(
subject="orders/order-12347",
event_type="Contoso.Orders.OrderCreated",
data={"order_id": "12347", "total": 199.99},
data_version="1.0"
)
]
publisher.send(events)
publisher.close()from azure.eventgrid import EventGridEvent
from datetime import datetime, timezone
# Event with all optional properties
detailed_event = EventGridEvent(
subject="products/electronics/laptops/laptop-abc123",
event_type="Contoso.Inventory.ProductUpdated",
data={
"product_id": "laptop-abc123",
"name": "Gaming Laptop Pro",
"category": "electronics/laptops",
"price": 1299.99,
"inventory_count": 15,
"updated_fields": ["price", "inventory_count"]
},
data_version="2.1",
id="event-unique-id-789",
event_time=datetime.now(timezone.utc),
topic="contoso-inventory", # Usually set by the service
metadata_version="1"
)
publisher.send(detailed_event)from azure.eventgrid import SystemEventNames
# Filter events by system event types
def handle_system_event(event_type, event_data):
"""Handle different types of Azure system events."""
if event_type == SystemEventNames.StorageBlobCreatedEventName:
blob_url = event_data.get('url')
print(f"New blob created: {blob_url}")
elif event_type == SystemEventNames.ResourceWriteSuccessEventName:
resource_uri = event_data.get('resourceUri')
operation = event_data.get('operationName')
print(f"Resource operation successful: {operation} on {resource_uri}")
elif event_type == SystemEventNames.IoTHubDeviceConnectedEventName:
device_id = event_data.get('deviceId')
print(f"IoT device connected: {device_id}")
elif event_type == SystemEventNames.EventGridSubscriptionValidationEventName:
validation_code = event_data.get('validationCode')
validation_url = event_data.get('validationUrl')
print(f"Subscription validation required: {validation_code}")
# Handle validation logic
else:
print(f"Unknown system event: {event_type}")
# Example system event handling
system_events = [
{
"eventType": SystemEventNames.StorageBlobCreatedEventName,
"data": {
"url": "https://mystorageaccount.blob.core.windows.net/container/newfile.txt",
"contentType": "text/plain",
"contentLength": 1024
}
}
]
for event in system_events:
handle_system_event(event["eventType"], event["data"])from azure.eventgrid import generate_sas
from azure.core.credentials import AzureSasCredential
from datetime import datetime, timedelta
# Generate SAS token
endpoint = "https://my-topic.westus.eventgrid.azure.net/api/events"
access_key = "your_shared_access_key_here"
expiration = datetime.utcnow() + timedelta(hours=2)
sas_token = generate_sas(
endpoint=endpoint,
shared_access_key=access_key,
expiration_date_utc=expiration,
api_version="2018-01-01"
)
print(f"Generated SAS token: {sas_token}")
# Use SAS token for authentication
publisher = EventGridPublisherClient(
endpoint=endpoint,
credential=AzureSasCredential(sas_token)
)
# Create and send event
event = EventGridEvent(
subject="auth/sas-example",
event_type="Example.SasAuth",
data={"message": "Authenticated with SAS token"},
data_version="1.0"
)
publisher.send(event)
publisher.close()import json
from azure.eventgrid import EventGridEvent
# JSON string representation
json_event = """
{
"id": "event-123",
"subject": "myapp/users/user456",
"eventType": "MyApp.User.ProfileUpdated",
"eventTime": "2024-01-15T10:30:00Z",
"data": {
"userId": "user456",
"updatedFields": ["email", "phoneNumber"],
"previousEmail": "old@example.com",
"newEmail": "new@example.com"
},
"dataVersion": "1.0"
}
"""
# Create event from JSON string
event = EventGridEvent.from_json(json_event)
publisher.send(event)
# Create event from dictionary
event_dict = {
"subject": "orders/order-789",
"eventType": "Contoso.Orders.OrderShipped",
"data": {
"orderId": "789",
"trackingNumber": "TRACK123456",
"estimatedDelivery": "2024-01-20"
},
"dataVersion": "1.0"
}
event = EventGridEvent.from_json(event_dict)
publisher.send(event)import asyncio
from azure.eventgrid._legacy.aio import EventGridPublisherClient
from azure.eventgrid import EventGridEvent
from azure.core.credentials import AzureKeyCredential
async def async_legacy_publishing():
"""Example of async Event Grid Basic publishing."""
# Create async legacy publisher
async with EventGridPublisherClient(
endpoint="https://my-topic.westus.eventgrid.azure.net/api/events",
credential=AzureKeyCredential("access_key")
) as publisher:
# Create events
events = [
EventGridEvent(
subject=f"async/event-{i}",
event_type="AsyncExample.EventCreated",
data={"index": i, "timestamp": datetime.now().isoformat()},
data_version="1.0"
)
for i in range(5)
]
# Send events asynchronously
await publisher.send(events)
print(f"Sent {len(events)} events asynchronously")
# Run async function
asyncio.run(async_legacy_publishing())from azure.eventgrid import EventGridEvent, SystemEventNames
def validate_eventgrid_event(event_data):
"""Validate EventGrid event structure and required fields."""
required_fields = ['subject', 'eventType', 'data', 'dataVersion']
for field in required_fields:
if field not in event_data:
raise ValueError(f"Missing required field: {field}")
# Validate event type format
event_type = event_data['eventType']
if not event_type or '.' not in event_type:
raise ValueError("eventType should follow format: Publisher.Object.Action")
# Validate subject format
subject = event_data['subject']
if not subject or not subject.startswith('/'):
# Subject should be a resource path
pass # Flexible validation
# Validate data version
data_version = event_data['dataVersion']
if not data_version:
raise ValueError("dataVersion is required")
return True
# Example validation
event_data = {
"subject": "/orders/order-123",
"eventType": "Contoso.Orders.OrderProcessed",
"data": {"orderId": "123", "status": "completed"},
"dataVersion": "1.0"
}
try:
validate_eventgrid_event(event_data)
event = EventGridEvent.from_json(event_data)
print("Event validation successful")
except ValueError as e:
print(f"Event validation failed: {e}")from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
# Event Grid Basic (Legacy)
basic_publisher = EventGridPublisherClient(
endpoint="https://topic.region.eventgrid.azure.net/api/events",
credential=AzureKeyCredential("key")
# No namespace_topic specified = Event Grid Basic
)
eventgrid_event = EventGridEvent(
subject="legacy/example",
event_type="Legacy.EventType",
data={"key": "value"},
data_version="1.0"
)
basic_publisher.send(eventgrid_event)
# Event Grid Namespaces (Modern)
namespace_publisher = EventGridPublisherClient(
endpoint="https://namespace.region.eventgrid.azure.net",
credential=AzureKeyCredential("key"),
namespace_topic="my-topic" # Specifying namespace_topic = Event Grid Namespaces
)
cloud_event = CloudEvent(
source="modern/example",
type="Modern.EventType",
data={"key": "value"}
)
namespace_publisher.send(cloud_event)
# Clean up
basic_publisher.close()
namespace_publisher.close()from azure.core.exceptions import HttpResponseError, ClientAuthenticationError
from azure.eventgrid import generate_sas
from datetime import datetime, timedelta
def robust_legacy_publishing(endpoint, access_key, events):
"""Robust Event Grid Basic publishing with error handling."""
try:
# Generate SAS token with reasonable expiration
expiration = datetime.utcnow() + timedelta(hours=1)
sas_token = generate_sas(endpoint, access_key, expiration)
# Create publisher with SAS authentication
publisher = EventGridPublisherClient(
endpoint=endpoint,
credential=AzureSasCredential(sas_token)
)
# Send events with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
publisher.send(events)
print(f"Successfully sent {len(events)} events")
break
except HttpResponseError as e:
if e.status_code == 401:
# Authentication failed - regenerate token
print("Authentication failed, regenerating SAS token...")
new_expiration = datetime.utcnow() + timedelta(hours=1)
new_sas_token = generate_sas(endpoint, access_key, new_expiration)
publisher = EventGridPublisherClient(
endpoint=endpoint,
credential=AzureSasCredential(new_sas_token)
)
elif e.status_code == 413:
# Payload too large - split batch
print("Payload too large, splitting batch...")
mid = len(events) // 2
robust_legacy_publishing(endpoint, access_key, events[:mid])
robust_legacy_publishing(endpoint, access_key, events[mid:])
return
elif attempt == max_retries - 1:
print(f"Failed after {max_retries} attempts: {e}")
raise
else:
print(f"Attempt {attempt + 1} failed, retrying...")
time.sleep(2 ** attempt) # Exponential backoff
publisher.close()
except ValueError as e:
print(f"Invalid parameters: {e}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Usage
events = [
EventGridEvent(
subject="robust/example",
event_type="Example.RobustPublishing",
data={"attempt": i},
data_version="1.0"
)
for i in range(10)
]
robust_legacy_publishing(
endpoint="https://topic.region.eventgrid.azure.net/api/events",
access_key="your_access_key",
events=events
)Install with Tessl CLI
npx tessl i tessl/pypi-azure-eventgrid