CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-eventgrid

Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

publisher.mddocs/

Event Publishing

Event publishing in Azure Event Grid enables sending events to both Event Grid Basic topics/domains and Event Grid Namespace topics. The EventGridPublisherClient provides a unified interface that automatically detects the target environment and handles appropriate event formatting.

Capabilities

Publisher Client Creation

Creates a publisher client for sending events to Event Grid endpoints.

class EventGridPublisherClient:
    def __init__(
        self,
        endpoint: str,
        credential: Union[AzureKeyCredential, AzureSasCredential, TokenCredential],
        *,
        namespace_topic: Optional[str] = None,
        api_version: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """
        Create EventGrid publisher client.

        Parameters:
        - endpoint: Event Grid endpoint URL
        - credential: Authentication credential
        - namespace_topic: Topic name for namespaces (None for Event Grid Basic)
        - api_version: API version (defaults: "2024-06-01" for namespaces, "2018-01-01" for basic)
        """

Event Publishing

Sends events to Event Grid with automatic content type detection and schema validation.

def send(
    self,
    events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
    *,
    channel_name: Optional[str] = None,
    content_type: Optional[str] = None,
    **kwargs: Any
) -> None:
    """
    Send events to Event Grid.

    Parameters:
    - events: Single event or list of events to send
    - channel_name: Channel name for multi-channel publishing (namespaces only)
    - content_type: Override content type for the request
    
    Raises:
    - HttpResponseError: Server returned error response
    - ClientAuthenticationError: Authentication failed
    - ValueError: Invalid event format or missing required fields
    """

Low-Level HTTP Operations

Direct HTTP request handling for advanced scenarios and custom event formats.

def send_request(
    self,
    request: HttpRequest,
    *,
    stream: bool = False,
    **kwargs: Any
) -> HttpResponse:
    """
    Send raw HTTP request through the client pipeline.

    Parameters:
    - request: The HTTP request to send
    - stream: Whether to stream the response payload

    Returns:
    HttpResponse: Raw HTTP response
    """

Resource Management

Context manager support and explicit resource cleanup.

def close(self) -> None:
    """Close the client and cleanup resources."""

def __enter__(self) -> Self:
    """Context manager entry."""

def __exit__(self, *exc_details: Any) -> None:
    """Context manager exit with cleanup."""

Usage Examples

Event Grid Namespaces

from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
import json

# Create publisher for namespace topic
publisher = EventGridPublisherClient(
    endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
    credential=AzureKeyCredential("access_key"),
    namespace_topic="my-topic"
)

# Send CloudEvent
cloud_event = CloudEvent(
    source="myapp/orders",
    type="Order.Created",
    data={
        "order_id": "12345",
        "customer_id": "customer-456",
        "total": 99.99
    }
)

publisher.send(cloud_event)

# Send multiple events
events = [
    CloudEvent(source="app", type="Event.Type1", data={"key": "value1"}),
    CloudEvent(source="app", type="Event.Type2", data={"key": "value2"})
]

publisher.send(events)

# Send raw dictionary (converted to CloudEvent automatically)
raw_event = {
    "specversion": "1.0",
    "type": "Custom.Event",
    "source": "myapp",
    "id": "event-123",
    "data": {"message": "Hello World"}
}

publisher.send(raw_event)

# Channel-specific publishing (if namespace supports channels)
publisher.send(cloud_event, channel_name="orders-channel")

publisher.close()

Event Grid Basic (Legacy)

from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential

# Create publisher for basic topic (no namespace_topic specified)
publisher = EventGridPublisherClient(
    endpoint="https://my-topic.westus.eventgrid.azure.net/api/events",
    credential=AzureKeyCredential("access_key")
)

# Send EventGrid schema event
event = EventGridEvent(
    subject="orders/order-123",
    event_type="Order.Completed",
    data={
        "order_id": "123",
        "status": "completed",
        "total": 49.99
    },
    data_version="1.0"
)

publisher.send(event)

# Send multiple EventGrid events
events = [
    EventGridEvent(subject="orders/1", event_type="Order.Created", data={}, data_version="1.0"),
    EventGridEvent(subject="orders/2", event_type="Order.Updated", data={}, data_version="1.0")
]

publisher.send(events)

publisher.close()

Context Manager Usage

# Automatic resource cleanup
with EventGridPublisherClient(endpoint, credential, namespace_topic="topic") as publisher:
    publisher.send(events)
    # Client automatically closed on exit

SAS Authentication (Event Grid Basic Only)

from azure.eventgrid import generate_sas
from azure.core.credentials import AzureSasCredential
from datetime import datetime, timedelta

# Generate SAS token
endpoint = "https://my-topic.westus.eventgrid.azure.net/api/events"
access_key = "your_access_key"
expiration = datetime.utcnow() + timedelta(hours=1)

sas_token = generate_sas(endpoint, access_key, expiration)

# Use SAS token for authentication
publisher = EventGridPublisherClient(
    endpoint=endpoint,
    credential=AzureSasCredential(sas_token)
)

Error Handling

from azure.core.exceptions import HttpResponseError, ClientAuthenticationError

try:
    publisher.send(events)
except ClientAuthenticationError as e:
    print(f"Authentication failed: {e}")
except HttpResponseError as e:
    print(f"HTTP error {e.status_code}: {e.message}")
    if e.status_code == 413:
        print("Event batch too large, try sending fewer events")
    elif e.status_code == 400:
        print("Invalid event format or missing required fields")
except ValueError as e:
    print(f"Invalid event data: {e}")

Event Formats

CloudEvent Schema (Event Grid Namespaces)

CloudEvents v1.0 specification with required and optional fields:

# Required fields
cloud_event = CloudEvent(
    source="myapp/component",    # Context in which event occurred
    type="MyApp.Event.Happened", # Event type identifier
    data={"key": "value"}        # Event payload
)

# With optional fields
cloud_event = CloudEvent(
    source="myapp/orders",
    type="Order.StatusChanged",
    data={
        "order_id": "12345",
        "old_status": "pending",
        "new_status": "confirmed"
    },
    subject="orders/12345",           # Event subject
    time=datetime.utcnow(),           # Event timestamp
    datacontenttype="application/json" # Content type of data
)

EventGrid Schema (Event Grid Basic)

Traditional EventGrid event format:

eventgrid_event = EventGridEvent(
    subject="products/product-123",     # Required: Event subject
    event_type="Product.PriceChanged",  # Required: Event type
    data={                              # Required: Event data
        "product_id": "123",
        "old_price": 19.99,
        "new_price": 24.99
    },
    data_version="2.0",                 # Required: Data schema version
    topic="products",                   # Optional: Topic (auto-set by service)
    id="event-456",                     # Optional: Event ID (auto-generated)
    event_time=datetime.utcnow()        # Optional: Event time (auto-generated)
)

Raw Dictionary Events

Dictionaries are automatically converted based on target:

# For namespaces (converted to CloudEvent)
namespace_dict = {
    "specversion": "1.0",          # Required for CloudEvent
    "type": "MyApp.Event",         # Required
    "source": "myapp",             # Required
    "data": {"message": "hello"}   # Event payload
}

# For basic topics (converted to EventGrid event)
basic_dict = {
    "eventType": "Custom.Event",   # Maps to event_type
    "subject": "app/component",    # Maps to subject
    "data": {"key": "value"},      # Event data
    "dataVersion": "1.0"           # Maps to data_version
}

Install with Tessl CLI

npx tessl i tessl/pypi-azure-eventgrid

docs

async-operations.md

consumer.md

index.md

legacy.md

models.md

publisher.md

tile.json