CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-eventgrid

Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

legacy.mddocs/

Legacy Support

Azure Event Grid Python SDK provides comprehensive backward compatibility for Event Grid Basic (legacy) scenarios. This includes EventGrid schema events, system event names, SAS token generation, and legacy publisher clients.

Capabilities

EventGrid Schema Events

Traditional EventGrid event format for Event Grid Basic topics and domains.

class EventGridEvent:
    def __init__(
        self,
        subject: str,
        event_type: str,
        data: Any,
        data_version: str,
        *,
        topic: Optional[str] = None,
        metadata_version: Optional[str] = None,
        id: Optional[str] = None,
        event_time: Optional[datetime] = None
    ) -> None:
        """
        Create EventGrid schema event.

        Parameters:
        - subject: Subject of the event in the context of the event producer (required)
        - event_type: One of the registered event types for this event source (required)  
        - data: Event data specific to the resource provider (required)
        - data_version: The schema version of the data object (required)
        - topic: The resource path to the event source (optional, set by service)
        - metadata_version: The schema version of the event metadata (optional)
        - id: An identifier for the event (optional, auto-generated if not provided)
        - event_time: The time the event was generated (optional, auto-generated if not provided)
        """

    @classmethod
    def from_json(cls, event: Union[str, dict]) -> "EventGridEvent":
        """
        Create EventGridEvent from JSON string or dictionary.

        Parameters:
        - event: JSON string or dictionary representing the event

        Returns:
        EventGridEvent: Parsed event object
        """

System Event Names

Comprehensive enumeration of Azure system event types for filtering and handling built-in Azure service events.

class SystemEventNames(str, Enum):
    """
    Enumeration of Azure system event names.
    
    Contains 100+ system event types including:
    - EventGridSubscriptionValidationEventName
    - ResourceWriteSuccessEventName  
    - ResourceWriteFailureEventName
    - ResourceDeleteSuccessEventName
    - IoTHubDeviceConnectedEventName
    - IoTHubDeviceDisconnectedEventName
    - StorageBlobCreatedEventName
    - StorageBlobDeletedEventName
    - ServiceBusActiveMessagesAvailableWithNoListenersEventName
    - And many more...
    """
    
    # Event Grid events
    EventGridSubscriptionValidationEventName = "Microsoft.EventGrid.SubscriptionValidationEvent"
    EventGridSubscriptionDeletedEventName = "Microsoft.EventGrid.SubscriptionDeletedEvent"
    
    # Resource Manager events  
    ResourceWriteSuccessEventName = "Microsoft.Resources.ResourceWriteSuccess"
    ResourceWriteFailureEventName = "Microsoft.Resources.ResourceWriteFailure"
    ResourceWriteCancelEventName = "Microsoft.Resources.ResourceWriteCancel"
    ResourceDeleteSuccessEventName = "Microsoft.Resources.ResourceDeleteSuccess"
    ResourceDeleteFailureEventName = "Microsoft.Resources.ResourceDeleteFailure"
    ResourceDeleteCancelEventName = "Microsoft.Resources.ResourceDeleteCancel"
    
    # Storage events
    StorageBlobCreatedEventName = "Microsoft.Storage.BlobCreated"
    StorageBlobDeletedEventName = "Microsoft.Storage.BlobDeleted"
    StorageBlobRenamedEventName = "Microsoft.Storage.BlobRenamed"
    StorageDirectoryCreatedEventName = "Microsoft.Storage.DirectoryCreated"
    StorageDirectoryDeletedEventName = "Microsoft.Storage.DirectoryDeleted"
    StorageDirectoryRenamedEventName = "Microsoft.Storage.DirectoryRenamed"
    
    # IoT Hub events
    IoTHubDeviceConnectedEventName = "Microsoft.Devices.DeviceConnected"
    IoTHubDeviceDisconnectedEventName = "Microsoft.Devices.DeviceDisconnected"
    IoTHubDeviceCreatedEventName = "Microsoft.Devices.DeviceCreated"
    IoTHubDeviceDeletedEventName = "Microsoft.Devices.DeviceDeleted"
    IoTHubDeviceTelemetryEventName = "Microsoft.Devices.DeviceTelemetry"
    
    # And 90+ more system event names...

SAS Token Generation

Generate Shared Access Signature tokens for Event Grid Basic authentication.

def generate_sas(
    endpoint: str,
    shared_access_key: str,
    expiration_date_utc: datetime,
    *,
    api_version: Optional[str] = None
) -> str:
    """
    Generate Shared Access Signature for Event Grid authentication.

    Parameters:
    - endpoint: Event Grid topic endpoint URL
    - shared_access_key: Shared access key for the topic
    - expiration_date_utc: UTC datetime when the token expires
    - api_version: API version for the signature (optional)

    Returns:
    str: SAS token for authentication

    Raises:
    - ValueError: Invalid endpoint or key format
    - TypeError: Invalid expiration date type
    """

Legacy Async Publisher

Asynchronous Event Grid Basic publisher for backward compatibility.

# Available in azure.eventgrid._legacy.aio
class EventGridPublisherClient:
    def __init__(
        self,
        endpoint: str,
        credential: Union[AzureKeyCredential, AzureSasCredential],
        **kwargs: Any
    ) -> None: ...

    async def send(
        self,
        events: Union[EventGridEvent, List[EventGridEvent]],
        **kwargs: Any
    ) -> None:
        """
        Asynchronously send EventGrid schema events to Event Grid Basic.

        Parameters:
        - events: Single event or list of EventGrid events
        """

Usage Examples

Event Grid Basic Publishing

from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
from datetime import datetime

# Create publisher for Event Grid Basic topic
publisher = EventGridPublisherClient(
    endpoint="https://my-topic.westus.eventgrid.azure.net/api/events",
    credential=AzureKeyCredential("access_key")
    # Note: No namespace_topic parameter for Event Grid Basic
)

# Create EventGrid schema event
event = EventGridEvent(
    subject="orders/order-12345",
    event_type="Contoso.Orders.OrderCreated",
    data={
        "order_id": "12345",
        "customer_id": "customer-67890",
        "product_ids": ["product-1", "product-2"],
        "total_amount": 149.99,
        "currency": "USD"
    },
    data_version="1.0"
)

# Send single event
publisher.send(event)

# Send multiple events
events = [
    EventGridEvent(
        subject="orders/order-12346",
        event_type="Contoso.Orders.OrderCreated", 
        data={"order_id": "12346", "total": 99.99},
        data_version="1.0"
    ),
    EventGridEvent(
        subject="orders/order-12347",
        event_type="Contoso.Orders.OrderCreated",
        data={"order_id": "12347", "total": 199.99}, 
        data_version="1.0"
    )
]

publisher.send(events)
publisher.close()

Creating Events with All Properties

from azure.eventgrid import EventGridEvent
from datetime import datetime, timezone

# Event with all optional properties
detailed_event = EventGridEvent(
    subject="products/electronics/laptops/laptop-abc123",
    event_type="Contoso.Inventory.ProductUpdated",
    data={
        "product_id": "laptop-abc123",
        "name": "Gaming Laptop Pro",
        "category": "electronics/laptops",
        "price": 1299.99,
        "inventory_count": 15,
        "updated_fields": ["price", "inventory_count"]
    },
    data_version="2.1",
    id="event-unique-id-789",
    event_time=datetime.now(timezone.utc),
    topic="contoso-inventory",  # Usually set by the service
    metadata_version="1"
)

publisher.send(detailed_event)

Working with System Events

from azure.eventgrid import SystemEventNames

# Filter events by system event types
def handle_system_event(event_type, event_data):
    """Handle different types of Azure system events."""
    
    if event_type == SystemEventNames.StorageBlobCreatedEventName:
        blob_url = event_data.get('url')
        print(f"New blob created: {blob_url}")
        
    elif event_type == SystemEventNames.ResourceWriteSuccessEventName:
        resource_uri = event_data.get('resourceUri')
        operation = event_data.get('operationName')
        print(f"Resource operation successful: {operation} on {resource_uri}")
        
    elif event_type == SystemEventNames.IoTHubDeviceConnectedEventName:
        device_id = event_data.get('deviceId')
        print(f"IoT device connected: {device_id}")
        
    elif event_type == SystemEventNames.EventGridSubscriptionValidationEventName:
        validation_code = event_data.get('validationCode')
        validation_url = event_data.get('validationUrl')
        print(f"Subscription validation required: {validation_code}")
        # Handle validation logic
        
    else:
        print(f"Unknown system event: {event_type}")

# Example system event handling
system_events = [
    {
        "eventType": SystemEventNames.StorageBlobCreatedEventName,
        "data": {
            "url": "https://mystorageaccount.blob.core.windows.net/container/newfile.txt",
            "contentType": "text/plain",
            "contentLength": 1024
        }
    }
]

for event in system_events:
    handle_system_event(event["eventType"], event["data"])

SAS Authentication

from azure.eventgrid import generate_sas
from azure.core.credentials import AzureSasCredential
from datetime import datetime, timedelta

# Generate SAS token
endpoint = "https://my-topic.westus.eventgrid.azure.net/api/events"
access_key = "your_shared_access_key_here"
expiration = datetime.utcnow() + timedelta(hours=2)

sas_token = generate_sas(
    endpoint=endpoint,
    shared_access_key=access_key,
    expiration_date_utc=expiration,
    api_version="2018-01-01"
)

print(f"Generated SAS token: {sas_token}")

# Use SAS token for authentication
publisher = EventGridPublisherClient(
    endpoint=endpoint,
    credential=AzureSasCredential(sas_token)
)

# Create and send event
event = EventGridEvent(
    subject="auth/sas-example",
    event_type="Example.SasAuth",
    data={"message": "Authenticated with SAS token"},
    data_version="1.0"
)

publisher.send(event)
publisher.close()

Creating Events from JSON

import json
from azure.eventgrid import EventGridEvent

# JSON string representation
json_event = """
{
    "id": "event-123",
    "subject": "myapp/users/user456",  
    "eventType": "MyApp.User.ProfileUpdated",
    "eventTime": "2024-01-15T10:30:00Z",
    "data": {
        "userId": "user456",
        "updatedFields": ["email", "phoneNumber"],
        "previousEmail": "old@example.com",
        "newEmail": "new@example.com"
    },
    "dataVersion": "1.0"
}
"""

# Create event from JSON string
event = EventGridEvent.from_json(json_event)
publisher.send(event)

# Create event from dictionary
event_dict = {
    "subject": "orders/order-789",
    "eventType": "Contoso.Orders.OrderShipped", 
    "data": {
        "orderId": "789",
        "trackingNumber": "TRACK123456",
        "estimatedDelivery": "2024-01-20"
    },
    "dataVersion": "1.0"
}

event = EventGridEvent.from_json(event_dict)
publisher.send(event)

Legacy Async Publishing

import asyncio
from azure.eventgrid._legacy.aio import EventGridPublisherClient
from azure.eventgrid import EventGridEvent
from azure.core.credentials import AzureKeyCredential

async def async_legacy_publishing():
    """Example of async Event Grid Basic publishing."""
    
    # Create async legacy publisher
    async with EventGridPublisherClient(
        endpoint="https://my-topic.westus.eventgrid.azure.net/api/events",
        credential=AzureKeyCredential("access_key")
    ) as publisher:
        
        # Create events
        events = [
            EventGridEvent(
                subject=f"async/event-{i}",
                event_type="AsyncExample.EventCreated",
                data={"index": i, "timestamp": datetime.now().isoformat()},
                data_version="1.0"
            )
            for i in range(5)
        ]
        
        # Send events asynchronously
        await publisher.send(events)
        print(f"Sent {len(events)} events asynchronously")

# Run async function
asyncio.run(async_legacy_publishing())

Event Validation Patterns

from azure.eventgrid import EventGridEvent, SystemEventNames

def validate_eventgrid_event(event_data):
    """Validate EventGrid event structure and required fields."""
    
    required_fields = ['subject', 'eventType', 'data', 'dataVersion']
    
    for field in required_fields:
        if field not in event_data:
            raise ValueError(f"Missing required field: {field}")
    
    # Validate event type format
    event_type = event_data['eventType']
    if not event_type or '.' not in event_type:
        raise ValueError("eventType should follow format: Publisher.Object.Action")
    
    # Validate subject format
    subject = event_data['subject']
    if not subject or not subject.startswith('/'):
        # Subject should be a resource path
        pass  # Flexible validation
    
    # Validate data version
    data_version = event_data['dataVersion']
    if not data_version:
        raise ValueError("dataVersion is required")
    
    return True

# Example validation
event_data = {
    "subject": "/orders/order-123",
    "eventType": "Contoso.Orders.OrderProcessed",
    "data": {"orderId": "123", "status": "completed"},
    "dataVersion": "1.0"
}

try:
    validate_eventgrid_event(event_data)
    event = EventGridEvent.from_json(event_data)
    print("Event validation successful")
except ValueError as e:
    print(f"Event validation failed: {e}")

Event Grid Basic vs Namespaces Comparison

from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent

# Event Grid Basic (Legacy)
basic_publisher = EventGridPublisherClient(
    endpoint="https://topic.region.eventgrid.azure.net/api/events",
    credential=AzureKeyCredential("key")
    # No namespace_topic specified = Event Grid Basic
)

eventgrid_event = EventGridEvent(
    subject="legacy/example",
    event_type="Legacy.EventType",
    data={"key": "value"},
    data_version="1.0"
)

basic_publisher.send(eventgrid_event)

# Event Grid Namespaces (Modern)  
namespace_publisher = EventGridPublisherClient(
    endpoint="https://namespace.region.eventgrid.azure.net",
    credential=AzureKeyCredential("key"),
    namespace_topic="my-topic"  # Specifying namespace_topic = Event Grid Namespaces
)

cloud_event = CloudEvent(
    source="modern/example",
    type="Modern.EventType", 
    data={"key": "value"}
)

namespace_publisher.send(cloud_event)

# Clean up
basic_publisher.close()
namespace_publisher.close()

Error Handling for Legacy Operations

from azure.core.exceptions import HttpResponseError, ClientAuthenticationError
from azure.eventgrid import generate_sas
from datetime import datetime, timedelta

def robust_legacy_publishing(endpoint, access_key, events):
    """Robust Event Grid Basic publishing with error handling."""
    
    try:
        # Generate SAS token with reasonable expiration
        expiration = datetime.utcnow() + timedelta(hours=1)
        sas_token = generate_sas(endpoint, access_key, expiration)
        
        # Create publisher with SAS authentication
        publisher = EventGridPublisherClient(
            endpoint=endpoint,
            credential=AzureSasCredential(sas_token)
        )
        
        # Send events with retry logic
        max_retries = 3
        for attempt in range(max_retries):
            try:
                publisher.send(events)
                print(f"Successfully sent {len(events)} events")
                break
                
            except HttpResponseError as e:
                if e.status_code == 401:
                    # Authentication failed - regenerate token
                    print("Authentication failed, regenerating SAS token...")
                    new_expiration = datetime.utcnow() + timedelta(hours=1)
                    new_sas_token = generate_sas(endpoint, access_key, new_expiration)
                    publisher = EventGridPublisherClient(
                        endpoint=endpoint,
                        credential=AzureSasCredential(new_sas_token)
                    )
                    
                elif e.status_code == 413:
                    # Payload too large - split batch
                    print("Payload too large, splitting batch...")
                    mid = len(events) // 2
                    robust_legacy_publishing(endpoint, access_key, events[:mid])
                    robust_legacy_publishing(endpoint, access_key, events[mid:])
                    return
                    
                elif attempt == max_retries - 1:
                    print(f"Failed after {max_retries} attempts: {e}")
                    raise
                else:
                    print(f"Attempt {attempt + 1} failed, retrying...")
                    time.sleep(2 ** attempt)  # Exponential backoff
                    
        publisher.close()
        
    except ValueError as e:
        print(f"Invalid parameters: {e}")
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise

# Usage
events = [
    EventGridEvent(
        subject="robust/example",
        event_type="Example.RobustPublishing",
        data={"attempt": i},
        data_version="1.0"
    )
    for i in range(10)
]

robust_legacy_publishing(
    endpoint="https://topic.region.eventgrid.azure.net/api/events",
    access_key="your_access_key",
    events=events
)

Install with Tessl CLI

npx tessl i tessl/pypi-azure-eventgrid

docs

async-operations.md

consumer.md

index.md

legacy.md

models.md

publisher.md

tile.json