0
# Async Operations
1
2
Azure Event Grid Python SDK provides full asynchronous support for both publisher and consumer operations. The async clients have identical APIs to their synchronous counterparts but use `async`/`await` for non-blocking operations.
3
4
## Capabilities
5
6
### Async Publisher Client
7
8
Asynchronous event publishing for high-throughput scenarios and non-blocking operations.
9
10
```python { .api }
11
class EventGridPublisherClient:
12
def __init__(
13
self,
14
endpoint: str,
15
credential: Union[AzureKeyCredential, AzureSasCredential, AsyncTokenCredential],
16
*,
17
namespace_topic: Optional[str] = None,
18
api_version: Optional[str] = None,
19
**kwargs: Any
20
) -> None: ...
21
22
async def send(
23
self,
24
events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],
25
*,
26
channel_name: Optional[str] = None,
27
content_type: Optional[str] = None,
28
**kwargs: Any
29
) -> None:
30
"""
31
Asynchronously send events to Event Grid.
32
33
Parameters:
34
- events: Single event or list of events to send
35
- channel_name: Channel name for multi-channel publishing (namespaces only)
36
- content_type: Override content type for the request
37
"""
38
39
async def send_request(
40
self,
41
request: HttpRequest,
42
*,
43
stream: bool = False,
44
**kwargs: Any
45
) -> AsyncHttpResponse: ...
46
47
async def close(self) -> None:
48
"""Asynchronously close the client and cleanup resources."""
49
50
async def __aenter__(self) -> Self: ...
51
async def __aexit__(self, *exc_details: Any) -> None: ...
52
```
53
54
### Async Consumer Client
55
56
Asynchronous event consumption with all management operations for Event Grid Namespaces.
57
58
```python { .api }
59
class EventGridConsumerClient:
60
def __init__(
61
self,
62
endpoint: str,
63
credential: Union[AzureKeyCredential, AsyncTokenCredential],
64
*,
65
namespace_topic: str,
66
subscription: str,
67
api_version: Optional[str] = None,
68
**kwargs: Any
69
) -> None: ...
70
71
async def receive(
72
self,
73
*,
74
max_events: Optional[int] = None,
75
max_wait_time: Optional[int] = None,
76
**kwargs: Any
77
) -> List[ReceiveDetails]:
78
"""
79
Asynchronously receive batch of Cloud Events from subscription.
80
81
Parameters:
82
- max_events: Maximum number of events to receive
83
- max_wait_time: Maximum wait time in seconds
84
85
Returns:
86
List[ReceiveDetails]: List of received events with broker properties
87
"""
88
89
async def acknowledge(
90
self,
91
*,
92
lock_tokens: List[str],
93
**kwargs: Any
94
) -> AcknowledgeResult:
95
"""Asynchronously acknowledge successfully processed events."""
96
97
async def release(
98
self,
99
*,
100
lock_tokens: List[str],
101
release_delay: Optional[Union[str, ReleaseDelay]] = None,
102
**kwargs: Any
103
) -> ReleaseResult:
104
"""Asynchronously release events back to subscription for reprocessing."""
105
106
async def reject(
107
self,
108
*,
109
lock_tokens: List[str],
110
**kwargs: Any
111
) -> RejectResult:
112
"""Asynchronously reject events that cannot be processed."""
113
114
async def renew_locks(
115
self,
116
*,
117
lock_tokens: List[str],
118
**kwargs: Any
119
) -> RenewLocksResult:
120
"""Asynchronously renew locks on events to extend processing time."""
121
122
async def send_request(
123
self,
124
request: HttpRequest,
125
*,
126
stream: bool = False,
127
**kwargs: Any
128
) -> AsyncHttpResponse: ...
129
130
async def close(self) -> None: ...
131
async def __aenter__(self) -> Self: ...
132
async def __aexit__(self, *exc_details: Any) -> None: ...
133
```
134
135
## Usage Examples
136
137
### Async Event Publishing
138
139
```python
140
import asyncio
141
from azure.eventgrid.aio import EventGridPublisherClient
142
from azure.core.credentials import AzureKeyCredential
143
from azure.core.messaging import CloudEvent
144
145
async def publish_events():
146
"""Asynchronously publish events to Event Grid Namespace."""
147
148
# Create async publisher client
149
async with EventGridPublisherClient(
150
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
151
credential=AzureKeyCredential("access_key"),
152
namespace_topic="orders-topic"
153
) as publisher:
154
155
# Create events
156
events = [
157
CloudEvent(
158
source="orders-service",
159
type="Order.Created",
160
data={"order_id": f"order-{i}", "total": i * 10.99}
161
)
162
for i in range(1, 6)
163
]
164
165
# Send events asynchronously
166
await publisher.send(events)
167
print(f"Published {len(events)} events")
168
169
# Run the async function
170
asyncio.run(publish_events())
171
```
172
173
### Async Event Consumption
174
175
```python
176
import asyncio
177
from azure.eventgrid.aio import EventGridConsumerClient
178
from azure.core.credentials import AzureKeyCredential
179
from azure.eventgrid.models import ReleaseDelay
180
181
async def consume_events():
182
"""Asynchronously consume and process events."""
183
184
async with EventGridConsumerClient(
185
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
186
credential=AzureKeyCredential("access_key"),
187
namespace_topic="orders-topic",
188
subscription="order-processor"
189
) as consumer:
190
191
# Receive events
192
events = await consumer.receive(max_events=10, max_wait_time=30)
193
194
if not events:
195
print("No events received")
196
return
197
198
print(f"Received {len(events)} events")
199
200
# Process events concurrently
201
tasks = []
202
for event_detail in events:
203
task = process_event_async(event_detail)
204
tasks.append(task)
205
206
# Wait for all processing to complete
207
results = await asyncio.gather(*tasks, return_exceptions=True)
208
209
# Group results by outcome
210
success_tokens = []
211
failed_tokens = []
212
213
for i, result in enumerate(results):
214
lock_token = events[i].broker_properties.lock_token
215
216
if isinstance(result, Exception):
217
print(f"Processing failed: {result}")
218
failed_tokens.append(lock_token)
219
elif result:
220
success_tokens.append(lock_token)
221
else:
222
failed_tokens.append(lock_token)
223
224
# Acknowledge successful events
225
if success_tokens:
226
ack_result = await consumer.acknowledge(lock_tokens=success_tokens)
227
print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")
228
229
# Release failed events
230
if failed_tokens:
231
release_result = await consumer.release(
232
lock_tokens=failed_tokens,
233
release_delay=ReleaseDelay.ONE_MINUTE
234
)
235
print(f"Released {len(release_result.succeeded_lock_tokens)} events")
236
237
async def process_event_async(event_detail):
238
"""Simulate async event processing."""
239
try:
240
# Simulate async work
241
await asyncio.sleep(0.1)
242
243
# Process the event
244
cloud_event = event_detail.event
245
print(f"Processing event: {cloud_event.type} from {cloud_event.source}")
246
247
# Simulate processing logic
248
if "error" in str(cloud_event.data).lower():
249
return False # Processing failed
250
251
return True # Processing succeeded
252
253
except Exception as e:
254
print(f"Unexpected error: {e}")
255
return False
256
257
# Run the consumer
258
asyncio.run(consume_events())
259
```
260
261
### Concurrent Publishing
262
263
```python
264
import asyncio
265
from azure.eventgrid.aio import EventGridPublisherClient
266
from azure.core.credentials import AzureKeyCredential
267
from azure.core.messaging import CloudEvent
268
269
async def publish_to_multiple_topics():
270
"""Publish events to multiple topics concurrently."""
271
272
credential = AzureKeyCredential("access_key")
273
endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"
274
275
# Create multiple publisher clients
276
publishers = [
277
EventGridPublisherClient(endpoint, credential, namespace_topic="topic1"),
278
EventGridPublisherClient(endpoint, credential, namespace_topic="topic2"),
279
EventGridPublisherClient(endpoint, credential, namespace_topic="topic3")
280
]
281
282
try:
283
# Create events for each topic
284
events_per_topic = [
285
[CloudEvent(source="app", type="Type1", data={"topic": "topic1", "id": i}) for i in range(5)],
286
[CloudEvent(source="app", type="Type2", data={"topic": "topic2", "id": i}) for i in range(5)],
287
[CloudEvent(source="app", type="Type3", data={"topic": "topic3", "id": i}) for i in range(5)]
288
]
289
290
# Publish to all topics concurrently
291
tasks = []
292
for publisher, events in zip(publishers, events_per_topic):
293
task = publisher.send(events)
294
tasks.append(task)
295
296
# Wait for all publishing to complete
297
await asyncio.gather(*tasks)
298
print("Published events to all topics")
299
300
finally:
301
# Close all publishers
302
close_tasks = [publisher.close() for publisher in publishers]
303
await asyncio.gather(*close_tasks)
304
305
asyncio.run(publish_to_multiple_topics())
306
```
307
308
### Event Processing Pipeline
309
310
```python
311
import asyncio
312
from azure.eventgrid.aio import EventGridConsumerClient, EventGridPublisherClient
313
from azure.core.credentials import AzureKeyCredential
314
from azure.core.messaging import CloudEvent
315
316
class EventProcessor:
317
"""Async event processing pipeline."""
318
319
def __init__(self, endpoint, credential):
320
self.endpoint = endpoint
321
self.credential = credential
322
323
async def run_pipeline(self):
324
"""Run continuous event processing pipeline."""
325
326
# Input consumer
327
input_consumer = EventGridConsumerClient(
328
self.endpoint, self.credential,
329
namespace_topic="input-topic",
330
subscription="processor"
331
)
332
333
# Output publisher
334
output_publisher = EventGridPublisherClient(
335
self.endpoint, self.credential,
336
namespace_topic="output-topic"
337
)
338
339
async with input_consumer, output_publisher:
340
while True:
341
try:
342
# Receive events
343
events = await input_consumer.receive(max_events=5, max_wait_time=10)
344
345
if not events:
346
await asyncio.sleep(1)
347
continue
348
349
# Process events concurrently
350
processed_events = await self.process_events(events)
351
352
# Publish processed events
353
if processed_events:
354
await output_publisher.send(processed_events)
355
356
# Acknowledge input events
357
lock_tokens = [e.broker_properties.lock_token for e in events]
358
await input_consumer.acknowledge(lock_tokens=lock_tokens)
359
360
print(f"Processed {len(events)} events")
361
362
except KeyboardInterrupt:
363
print("Shutting down pipeline...")
364
break
365
except Exception as e:
366
print(f"Pipeline error: {e}")
367
await asyncio.sleep(5) # Brief pause before retry
368
369
async def process_events(self, events):
370
"""Process events concurrently."""
371
tasks = [self.transform_event(event_detail) for event_detail in events]
372
results = await asyncio.gather(*tasks, return_exceptions=True)
373
374
# Filter out failed transformations
375
processed_events = []
376
for result in results:
377
if not isinstance(result, Exception) and result:
378
processed_events.append(result)
379
380
return processed_events
381
382
async def transform_event(self, event_detail):
383
"""Transform a single event."""
384
try:
385
# Simulate async transformation
386
await asyncio.sleep(0.1)
387
388
input_event = event_detail.event
389
390
# Create transformed event
391
output_event = CloudEvent(
392
source="event-processor",
393
type=f"Processed.{input_event.type}",
394
data={
395
"original_data": input_event.data,
396
"processed_at": asyncio.get_event_loop().time(),
397
"processor_id": "async-processor-1"
398
}
399
)
400
401
return output_event
402
403
except Exception as e:
404
print(f"Transformation failed: {e}")
405
return None
406
407
# Run the pipeline
408
processor = EventProcessor(
409
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
410
credential=AzureKeyCredential("access_key")
411
)
412
413
asyncio.run(processor.run_pipeline())
414
```
415
416
### Async Context Manager Patterns
417
418
```python
419
import asyncio
420
from azure.eventgrid.aio import EventGridPublisherClient, EventGridConsumerClient
421
422
async def context_manager_examples():
423
"""Demonstrate async context manager usage patterns."""
424
425
credential = AzureKeyCredential("access_key")
426
endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"
427
428
# Single client context manager
429
async with EventGridPublisherClient(
430
endpoint, credential, namespace_topic="topic"
431
) as publisher:
432
await publisher.send([CloudEvent(source="app", type="Test", data={})])
433
434
# Multiple clients with AsyncExitStack
435
from contextlib import AsyncExitStack
436
437
async with AsyncExitStack() as stack:
438
# Enter multiple async context managers
439
publisher = await stack.enter_async_context(
440
EventGridPublisherClient(endpoint, credential, namespace_topic="output")
441
)
442
consumer = await stack.enter_async_context(
443
EventGridConsumerClient(
444
endpoint, credential,
445
namespace_topic="input",
446
subscription="processor"
447
)
448
)
449
450
# Use both clients
451
events = await consumer.receive(max_events=5)
452
if events:
453
# Process and forward events
454
processed = [transform_event(e) for e in events]
455
await publisher.send(processed)
456
457
# Acknowledge input events
458
tokens = [e.broker_properties.lock_token for e in events]
459
await consumer.acknowledge(lock_tokens=tokens)
460
461
# All clients automatically closed on exit
462
463
def transform_event(event_detail):
464
"""Simple event transformation."""
465
return CloudEvent(
466
source="transformer",
467
type=f"Transformed.{event_detail.event.type}",
468
data={"original": event_detail.event.data}
469
)
470
471
asyncio.run(context_manager_examples())
472
```
473
474
### Error Handling with Async Operations
475
476
```python
477
import asyncio
478
from azure.core.exceptions import HttpResponseError, ClientAuthenticationError
479
480
async def robust_async_processing():
481
"""Demonstrate robust error handling in async operations."""
482
483
async with EventGridConsumerClient(...) as consumer:
484
while True:
485
try:
486
# Receive events with timeout
487
events = await asyncio.wait_for(
488
consumer.receive(max_events=10),
489
timeout=30.0
490
)
491
492
if not events:
493
continue
494
495
# Process with individual error handling
496
success_tokens = []
497
retry_tokens = []
498
499
for event_detail in events:
500
try:
501
# Process with timeout
502
result = await asyncio.wait_for(
503
process_event_async(event_detail.event),
504
timeout=5.0
505
)
506
507
if result:
508
success_tokens.append(event_detail.broker_properties.lock_token)
509
else:
510
retry_tokens.append(event_detail.broker_properties.lock_token)
511
512
except asyncio.TimeoutError:
513
print("Event processing timed out")
514
retry_tokens.append(event_detail.broker_properties.lock_token)
515
516
except Exception as e:
517
print(f"Event processing failed: {e}")
518
retry_tokens.append(event_detail.broker_properties.lock_token)
519
520
# Handle results concurrently
521
tasks = []
522
523
if success_tokens:
524
tasks.append(consumer.acknowledge(lock_tokens=success_tokens))
525
526
if retry_tokens:
527
tasks.append(consumer.release(
528
lock_tokens=retry_tokens,
529
release_delay=ReleaseDelay.ONE_MINUTE
530
))
531
532
if tasks:
533
results = await asyncio.gather(*tasks, return_exceptions=True)
534
for result in results:
535
if isinstance(result, Exception):
536
print(f"Operation failed: {result}")
537
538
except asyncio.TimeoutError:
539
print("Receive operation timed out, continuing...")
540
541
except ClientAuthenticationError as e:
542
print(f"Authentication failed: {e}")
543
break # Cannot continue without valid auth
544
545
except HttpResponseError as e:
546
print(f"HTTP error: {e.status_code} - {e.message}")
547
if e.status_code >= 500:
548
# Server error, wait and retry
549
await asyncio.sleep(5)
550
else:
551
# Client error, may need intervention
552
break
553
554
except Exception as e:
555
print(f"Unexpected error: {e}")
556
await asyncio.sleep(1)
557
558
asyncio.run(robust_async_processing())
559
```
560
561
## Performance Considerations
562
563
### Concurrency Control
564
565
```python
566
import asyncio
567
from asyncio import Semaphore
568
569
async def controlled_concurrent_processing(consumer, max_concurrent=10):
570
"""Process events with controlled concurrency."""
571
572
semaphore = Semaphore(max_concurrent)
573
574
async def process_with_semaphore(event_detail):
575
async with semaphore:
576
return await process_event_async(event_detail)
577
578
events = await consumer.receive(max_events=50)
579
580
# Process with limited concurrency
581
tasks = [process_with_semaphore(event) for event in events]
582
results = await asyncio.gather(*tasks, return_exceptions=True)
583
584
# Handle results...
585
```
586
587
### Batch Processing Optimization
588
589
```python
590
async def optimized_batch_processing(consumer, batch_size=20):
591
"""Optimized async batch processing."""
592
593
while True:
594
# Receive larger batches
595
events = await consumer.receive(max_events=batch_size, max_wait_time=10)
596
597
if not events:
598
await asyncio.sleep(0.1) # Brief pause
599
continue
600
601
# Process in smaller concurrent chunks
602
chunk_size = 5
603
for i in range(0, len(events), chunk_size):
604
chunk = events[i:i + chunk_size]
605
606
# Process chunk concurrently
607
tasks = [process_event_async(event) for event in chunk]
608
results = await asyncio.gather(*tasks, return_exceptions=True)
609
610
# Handle chunk results immediately
611
success_tokens = []
612
failed_tokens = []
613
614
for j, result in enumerate(results):
615
lock_token = chunk[j].broker_properties.lock_token
616
if isinstance(result, Exception) or not result:
617
failed_tokens.append(lock_token)
618
else:
619
success_tokens.append(lock_token)
620
621
# Process results concurrently
622
ops = []
623
if success_tokens:
624
ops.append(consumer.acknowledge(lock_tokens=success_tokens))
625
if failed_tokens:
626
ops.append(consumer.release(lock_tokens=failed_tokens))
627
628
if ops:
629
await asyncio.gather(*ops, return_exceptions=True)
630
```