CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-eventgrid

Microsoft Azure Event Grid Client Library for Python with publisher and consumer clients for both Event Grid Basic and Event Grid Namespaces

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Azure Event Grid Python SDK provides full asynchronous support for both publisher and consumer operations. The async clients have identical APIs to their synchronous counterparts but use async/await for non-blocking operations.

Capabilities

Async Publisher Client

Asynchronous event publishing for high-throughput scenarios and non-blocking operations.

class EventGridPublisherClient:
    def __init__(
        self,
        endpoint: str,
        credential: Union[AzureKeyCredential, AzureSasCredential, AsyncTokenCredential],
        *,
        namespace_topic: Optional[str] = None,
        api_version: Optional[str] = None,
        **kwargs: Any
    ) -> None: ...

    async def send(
        self,
        events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
        *,
        channel_name: Optional[str] = None,
        content_type: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """
        Asynchronously send events to Event Grid.

        Parameters:
        - events: Single event or list of events to send
        - channel_name: Channel name for multi-channel publishing (namespaces only)
        - content_type: Override content type for the request
        """

    async def send_request(
        self,
        request: HttpRequest,
        *,
        stream: bool = False,
        **kwargs: Any
    ) -> AsyncHttpResponse: ...

    async def close(self) -> None:
        """Asynchronously close the client and cleanup resources."""

    async def __aenter__(self) -> Self: ...
    async def __aexit__(self, *exc_details: Any) -> None: ...

Async Consumer Client

Asynchronous event consumption with all management operations for Event Grid Namespaces.

class EventGridConsumerClient:
    def __init__(
        self,
        endpoint: str,
        credential: Union[AzureKeyCredential, AsyncTokenCredential],
        *,
        namespace_topic: str,
        subscription: str,
        api_version: Optional[str] = None,
        **kwargs: Any
    ) -> None: ...

    async def receive(
        self,
        *,
        max_events: Optional[int] = None,
        max_wait_time: Optional[int] = None,
        **kwargs: Any
    ) -> List[ReceiveDetails]:
        """
        Asynchronously receive batch of Cloud Events from subscription.

        Parameters:
        - max_events: Maximum number of events to receive
        - max_wait_time: Maximum wait time in seconds

        Returns:
        List[ReceiveDetails]: List of received events with broker properties
        """

    async def acknowledge(
        self,
        *,
        lock_tokens: List[str],
        **kwargs: Any
    ) -> AcknowledgeResult:
        """Asynchronously acknowledge successfully processed events."""

    async def release(
        self,
        *,
        lock_tokens: List[str],
        release_delay: Optional[Union[str, ReleaseDelay]] = None,
        **kwargs: Any
    ) -> ReleaseResult:
        """Asynchronously release events back to subscription for reprocessing."""

    async def reject(
        self,
        *,
        lock_tokens: List[str],
        **kwargs: Any
    ) -> RejectResult:
        """Asynchronously reject events that cannot be processed."""

    async def renew_locks(
        self,
        *,
        lock_tokens: List[str],
        **kwargs: Any
    ) -> RenewLocksResult:
        """Asynchronously renew locks on events to extend processing time."""

    async def send_request(
        self,
        request: HttpRequest,
        *,
        stream: bool = False,
        **kwargs: Any
    ) -> AsyncHttpResponse: ...

    async def close(self) -> None: ...
    async def __aenter__(self) -> Self: ...
    async def __aexit__(self, *exc_details: Any) -> None: ...

Usage Examples

Async Event Publishing

import asyncio
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent

async def publish_events():
    """Asynchronously publish events to Event Grid Namespace."""
    
    # Create async publisher client
    async with EventGridPublisherClient(
        endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
        credential=AzureKeyCredential("access_key"),
        namespace_topic="orders-topic"
    ) as publisher:
        
        # Create events
        events = [
            CloudEvent(
                source="orders-service",
                type="Order.Created",
                data={"order_id": f"order-{i}", "total": i * 10.99}
            )
            for i in range(1, 6)
        ]
        
        # Send events asynchronously
        await publisher.send(events)
        print(f"Published {len(events)} events")

# Run the async function
asyncio.run(publish_events())

Async Event Consumption

import asyncio
from azure.eventgrid.aio import EventGridConsumerClient
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid.models import ReleaseDelay

async def consume_events():
    """Asynchronously consume and process events."""
    
    async with EventGridConsumerClient(
        endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
        credential=AzureKeyCredential("access_key"),
        namespace_topic="orders-topic",
        subscription="order-processor"
    ) as consumer:
        
        # Receive events
        events = await consumer.receive(max_events=10, max_wait_time=30)
        
        if not events:
            print("No events received")
            return
        
        print(f"Received {len(events)} events")
        
        # Process events concurrently
        tasks = []
        for event_detail in events:
            task = process_event_async(event_detail)
            tasks.append(task)
        
        # Wait for all processing to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Group results by outcome
        success_tokens = []
        failed_tokens = []
        
        for i, result in enumerate(results):
            lock_token = events[i].broker_properties.lock_token
            
            if isinstance(result, Exception):
                print(f"Processing failed: {result}")
                failed_tokens.append(lock_token)
            elif result:
                success_tokens.append(lock_token)
            else:
                failed_tokens.append(lock_token)
        
        # Acknowledge successful events
        if success_tokens:
            ack_result = await consumer.acknowledge(lock_tokens=success_tokens)
            print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")
        
        # Release failed events
        if failed_tokens:
            release_result = await consumer.release(
                lock_tokens=failed_tokens,
                release_delay=ReleaseDelay.ONE_MINUTE
            )
            print(f"Released {len(release_result.succeeded_lock_tokens)} events")

async def process_event_async(event_detail):
    """Simulate async event processing."""
    try:
        # Simulate async work
        await asyncio.sleep(0.1)
        
        # Process the event
        cloud_event = event_detail.event
        print(f"Processing event: {cloud_event.type} from {cloud_event.source}")
        
        # Simulate processing logic
        if "error" in str(cloud_event.data).lower():
            return False  # Processing failed
        
        return True  # Processing succeeded
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        return False

# Run the consumer
asyncio.run(consume_events())

Concurrent Publishing

import asyncio
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent

async def publish_to_multiple_topics():
    """Publish events to multiple topics concurrently."""
    
    credential = AzureKeyCredential("access_key")
    endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"
    
    # Create multiple publisher clients
    publishers = [
        EventGridPublisherClient(endpoint, credential, namespace_topic="topic1"),
        EventGridPublisherClient(endpoint, credential, namespace_topic="topic2"),
        EventGridPublisherClient(endpoint, credential, namespace_topic="topic3")
    ]
    
    try:
        # Create events for each topic
        events_per_topic = [
            [CloudEvent(source="app", type="Type1", data={"topic": "topic1", "id": i}) for i in range(5)],
            [CloudEvent(source="app", type="Type2", data={"topic": "topic2", "id": i}) for i in range(5)],
            [CloudEvent(source="app", type="Type3", data={"topic": "topic3", "id": i}) for i in range(5)]
        ]
        
        # Publish to all topics concurrently
        tasks = []
        for publisher, events in zip(publishers, events_per_topic):
            task = publisher.send(events)
            tasks.append(task)
        
        # Wait for all publishing to complete
        await asyncio.gather(*tasks)
        print("Published events to all topics")
        
    finally:
        # Close all publishers
        close_tasks = [publisher.close() for publisher in publishers]
        await asyncio.gather(*close_tasks)

asyncio.run(publish_to_multiple_topics())

Event Processing Pipeline

import asyncio
from azure.eventgrid.aio import EventGridConsumerClient, EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent

class EventProcessor:
    """Async event processing pipeline."""
    
    def __init__(self, endpoint, credential):
        self.endpoint = endpoint
        self.credential = credential
        
    async def run_pipeline(self):
        """Run continuous event processing pipeline."""
        
        # Input consumer
        input_consumer = EventGridConsumerClient(
            self.endpoint, self.credential,
            namespace_topic="input-topic",
            subscription="processor"
        )
        
        # Output publisher
        output_publisher = EventGridPublisherClient(
            self.endpoint, self.credential,
            namespace_topic="output-topic"
        )
        
        async with input_consumer, output_publisher:
            while True:
                try:
                    # Receive events
                    events = await input_consumer.receive(max_events=5, max_wait_time=10)
                    
                    if not events:
                        await asyncio.sleep(1)
                        continue
                    
                    # Process events concurrently
                    processed_events = await self.process_events(events)
                    
                    # Publish processed events
                    if processed_events:
                        await output_publisher.send(processed_events)
                    
                    # Acknowledge input events
                    lock_tokens = [e.broker_properties.lock_token for e in events]
                    await input_consumer.acknowledge(lock_tokens=lock_tokens)
                    
                    print(f"Processed {len(events)} events")
                    
                except KeyboardInterrupt:
                    print("Shutting down pipeline...")
                    break
                except Exception as e:
                    print(f"Pipeline error: {e}")
                    await asyncio.sleep(5)  # Brief pause before retry
    
    async def process_events(self, events):
        """Process events concurrently."""
        tasks = [self.transform_event(event_detail) for event_detail in events]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Filter out failed transformations
        processed_events = []
        for result in results:
            if not isinstance(result, Exception) and result:
                processed_events.append(result)
        
        return processed_events
    
    async def transform_event(self, event_detail):
        """Transform a single event."""
        try:
            # Simulate async transformation
            await asyncio.sleep(0.1)
            
            input_event = event_detail.event
            
            # Create transformed event
            output_event = CloudEvent(
                source="event-processor",
                type=f"Processed.{input_event.type}",
                data={
                    "original_data": input_event.data,
                    "processed_at": asyncio.get_event_loop().time(),
                    "processor_id": "async-processor-1"
                }
            )
            
            return output_event
            
        except Exception as e:
            print(f"Transformation failed: {e}")
            return None

# Run the pipeline
processor = EventProcessor(
    endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
    credential=AzureKeyCredential("access_key")
)

asyncio.run(processor.run_pipeline())

Async Context Manager Patterns

import asyncio
from azure.eventgrid.aio import EventGridPublisherClient, EventGridConsumerClient

async def context_manager_examples():
    """Demonstrate async context manager usage patterns."""
    
    credential = AzureKeyCredential("access_key")
    endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"
    
    # Single client context manager
    async with EventGridPublisherClient(
        endpoint, credential, namespace_topic="topic"
    ) as publisher:
        await publisher.send([CloudEvent(source="app", type="Test", data={})])
    
    # Multiple clients with AsyncExitStack
    from contextlib import AsyncExitStack
    
    async with AsyncExitStack() as stack:
        # Enter multiple async context managers
        publisher = await stack.enter_async_context(
            EventGridPublisherClient(endpoint, credential, namespace_topic="output")
        )
        consumer = await stack.enter_async_context(
            EventGridConsumerClient(
                endpoint, credential, 
                namespace_topic="input", 
                subscription="processor"
            )
        )
        
        # Use both clients
        events = await consumer.receive(max_events=5)
        if events:
            # Process and forward events
            processed = [transform_event(e) for e in events]
            await publisher.send(processed)
            
            # Acknowledge input events
            tokens = [e.broker_properties.lock_token for e in events]
            await consumer.acknowledge(lock_tokens=tokens)
    
    # All clients automatically closed on exit

def transform_event(event_detail):
    """Simple event transformation."""
    return CloudEvent(
        source="transformer",
        type=f"Transformed.{event_detail.event.type}",
        data={"original": event_detail.event.data}
    )

asyncio.run(context_manager_examples())

Error Handling with Async Operations

import asyncio
from azure.core.exceptions import HttpResponseError, ClientAuthenticationError

async def robust_async_processing():
    """Demonstrate robust error handling in async operations."""
    
    async with EventGridConsumerClient(...) as consumer:
        while True:
            try:
                # Receive events with timeout
                events = await asyncio.wait_for(
                    consumer.receive(max_events=10), 
                    timeout=30.0
                )
                
                if not events:
                    continue
                
                # Process with individual error handling
                success_tokens = []
                retry_tokens = []
                
                for event_detail in events:
                    try:
                        # Process with timeout
                        result = await asyncio.wait_for(
                            process_event_async(event_detail.event),
                            timeout=5.0
                        )
                        
                        if result:
                            success_tokens.append(event_detail.broker_properties.lock_token)
                        else:
                            retry_tokens.append(event_detail.broker_properties.lock_token)
                            
                    except asyncio.TimeoutError:
                        print("Event processing timed out")
                        retry_tokens.append(event_detail.broker_properties.lock_token)
                        
                    except Exception as e:
                        print(f"Event processing failed: {e}")
                        retry_tokens.append(event_detail.broker_properties.lock_token)
                
                # Handle results concurrently
                tasks = []
                
                if success_tokens:
                    tasks.append(consumer.acknowledge(lock_tokens=success_tokens))
                
                if retry_tokens:
                    tasks.append(consumer.release(
                        lock_tokens=retry_tokens,
                        release_delay=ReleaseDelay.ONE_MINUTE
                    ))
                
                if tasks:
                    results = await asyncio.gather(*tasks, return_exceptions=True)
                    for result in results:
                        if isinstance(result, Exception):
                            print(f"Operation failed: {result}")
                
            except asyncio.TimeoutError:
                print("Receive operation timed out, continuing...")
                
            except ClientAuthenticationError as e:
                print(f"Authentication failed: {e}")
                break  # Cannot continue without valid auth
                
            except HttpResponseError as e:
                print(f"HTTP error: {e.status_code} - {e.message}")
                if e.status_code >= 500:
                    # Server error, wait and retry
                    await asyncio.sleep(5)
                else:
                    # Client error, may need intervention
                    break
                    
            except Exception as e:
                print(f"Unexpected error: {e}")
                await asyncio.sleep(1)

asyncio.run(robust_async_processing())

Performance Considerations

Concurrency Control

import asyncio
from asyncio import Semaphore

async def controlled_concurrent_processing(consumer, max_concurrent=10):
    """Process events with controlled concurrency."""
    
    semaphore = Semaphore(max_concurrent)
    
    async def process_with_semaphore(event_detail):
        async with semaphore:
            return await process_event_async(event_detail)
    
    events = await consumer.receive(max_events=50)
    
    # Process with limited concurrency
    tasks = [process_with_semaphore(event) for event in events]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle results...

Batch Processing Optimization

async def optimized_batch_processing(consumer, batch_size=20):
    """Optimized async batch processing."""
    
    while True:
        # Receive larger batches
        events = await consumer.receive(max_events=batch_size, max_wait_time=10)
        
        if not events:
            await asyncio.sleep(0.1)  # Brief pause
            continue
        
        # Process in smaller concurrent chunks
        chunk_size = 5
        for i in range(0, len(events), chunk_size):
            chunk = events[i:i + chunk_size]
            
            # Process chunk concurrently
            tasks = [process_event_async(event) for event in chunk]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Handle chunk results immediately
            success_tokens = []
            failed_tokens = []
            
            for j, result in enumerate(results):
                lock_token = chunk[j].broker_properties.lock_token
                if isinstance(result, Exception) or not result:
                    failed_tokens.append(lock_token)
                else:
                    success_tokens.append(lock_token)
            
            # Process results concurrently
            ops = []
            if success_tokens:
                ops.append(consumer.acknowledge(lock_tokens=success_tokens))
            if failed_tokens:
                ops.append(consumer.release(lock_tokens=failed_tokens))
            
            if ops:
                await asyncio.gather(*ops, return_exceptions=True)

Install with Tessl CLI

npx tessl i tessl/pypi-azure-eventgrid

docs

async-operations.md

consumer.md

index.md

legacy.md

models.md

publisher.md

tile.json