Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Microsoft Azure Event Grid Client Library for Python provides comprehensive support for both Event Grid Basic (legacy) and Event Grid Namespaces. This library enables event-driven applications to publish events to Event Grid topics and consume events from Event Grid Namespace subscriptions using reliable, scalable event routing and processing capabilities.
pip install azure-eventgridfrom azure.eventgrid import EventGridPublisherClient, EventGridConsumerClientFor async operations:
from azure.eventgrid.aio import EventGridPublisherClient, EventGridConsumerClientFor legacy Event Grid Basic support:
from azure.eventgrid import EventGridEvent, SystemEventNames, generate_sasFor models and data types:
from azure.eventgrid.models import (
AcknowledgeResult, ReleaseResult, RejectResult, RenewLocksResult,
ReceiveDetails, BrokerProperties, ReleaseDelay
)from azure.eventgrid import EventGridPublisherClient, EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
# Publisher client for sending events
publisher = EventGridPublisherClient(
endpoint="https://namespace.region.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="my-topic"
)
# Create and send events
cloud_event = CloudEvent(
source="myapp/products",
type="Product.Created",
data={
"product_id": "12345",
"name": "New Product",
"price": 29.99
}
)
publisher.send([cloud_event])
publisher.close()
# Consumer client for receiving events
consumer = EventGridConsumerClient(
endpoint="https://namespace.region.eventgrid.azure.net",
credential=AzureKeyCredential("access_key"),
namespace_topic="my-topic",
subscription="my-subscription"
)
# Receive and process events
events = consumer.receive(max_events=10, max_wait_time=60)
for event_detail in events:
print(f"Event: {event_detail.event.type}")
print(f"Data: {event_detail.event.data}")
# Process the event, then acknowledge
# consumer.acknowledge(lock_tokens=[event_detail.broker_properties.lock_token])
consumer.close()from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
# Publisher client for Event Grid Basic
publisher = EventGridPublisherClient(
endpoint="https://my-topic.region.eventgrid.azure.net/api/events",
credential=AzureKeyCredential("access_key")
# No namespace_topic specified for Basic
)
# Create EventGrid schema event
event = EventGridEvent(
subject="products/product-123",
event_type="Product.Created",
data={
"product_id": "123",
"name": "Example Product"
},
data_version="1.0"
)
publisher.send([event])
publisher.close()Azure Event Grid supports two distinct models:
The Python SDK provides unified client APIs for both models:
Send events to Event Grid Basic topics/domains or Event Grid Namespace topics. Supports CloudEvent schema (namespaces), EventGrid schema (basic), and raw dictionaries with automatic content type detection.
def send(
events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
*,
channel_name: Optional[str] = None,
content_type: Optional[str] = None,
**kwargs
) -> None: ...Receive and manage events from Event Grid Namespace subscriptions with operations for acknowledging, releasing, rejecting, and renewing locks on events.
def receive(
*,
max_events: Optional[int] = None,
max_wait_time: Optional[int] = None,
**kwargs
) -> List[ReceiveDetails]: ...
def acknowledge(*, lock_tokens: List[str], **kwargs) -> AcknowledgeResult: ...
def release(*, lock_tokens: List[str], release_delay: Optional[Union[int, ReleaseDelay]] = None, **kwargs) -> ReleaseResult: ...
def reject(*, lock_tokens: List[str], **kwargs) -> RejectResult: ...
def renew_locks(*, lock_tokens: List[str], **kwargs) -> RenewLocksResult: ...Asynchronous versions of all publisher and consumer operations for non-blocking event processing and high-throughput scenarios.
async def send(
events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
*,
channel_name: Optional[str] = None,
content_type: Optional[str] = None,
**kwargs
) -> None: ...
async def receive(
*,
max_events: Optional[int] = None,
max_wait_time: Optional[int] = None,
**kwargs
) -> List[ReceiveDetails]: ...Response models, event metadata, broker properties, and enums for comprehensive event handling and status reporting.
class ReceiveDetails:
broker_properties: BrokerProperties
event: CloudEvent
class BrokerProperties:
lock_token: str
delivery_count: int
class AcknowledgeResult:
failed_lock_tokens: List[FailedLockToken]
succeeded_lock_tokens: List[str]Event Grid Basic support with EventGrid schema events, system event names, and SAS token generation for backward compatibility.
class EventGridEvent:
def __init__(
self,
subject: str,
event_type: str,
data: Any,
data_version: str,
*,
topic: Optional[str] = None,
metadata_version: Optional[str] = None,
id: Optional[str] = None,
event_time: Optional[datetime] = None
) -> None: ...
def generate_sas(
endpoint: str,
shared_access_key: str,
expiration_date_utc: datetime,
*,
api_version: Optional[str] = None
) -> str: ...from typing import List, Optional, Union, Any
from datetime import datetime
from azure.core.credentials import AzureKeyCredential, AzureSasCredential, TokenCredential
from azure.core.credentials_async import AsyncTokenCredential
from azure.core.messaging import CloudEvent
from azure.core.rest import HttpRequest, HttpResponse
# Authentication types
Credential = Union[AzureKeyCredential, AzureSasCredential, TokenCredential]
AsyncCredential = Union[AzureKeyCredential, AsyncTokenCredential]
# Event types for publishing
EventType = Union[CloudEvent, EventGridEvent, dict]
EventList = List[EventType]
SendableEvents = Union[EventType, EventList]Install with Tessl CLI
npx tessl i tessl/pypi-azure-eventgrid