0
# Event Consumption
1
2
Event consumption in Azure Event Grid Namespaces enables receiving and managing events from subscriptions with pull delivery. The EventGridConsumerClient provides operations for receiving, acknowledging, releasing, rejecting, and renewing locks on events.
3
4
## Capabilities
5
6
### Consumer Client Creation
7
8
Creates a consumer client for receiving events from Event Grid Namespace subscriptions.
9
10
```python { .api }
11
class EventGridConsumerClient:
12
def __init__(
13
self,
14
endpoint: str,
15
credential: Union[AzureKeyCredential, TokenCredential],
16
*,
17
namespace_topic: str,
18
subscription: str,
19
api_version: Optional[str] = None,
20
**kwargs: Any
21
) -> None:
22
"""
23
Create EventGrid consumer client.
24
25
Parameters:
26
- endpoint: Event Grid namespace endpoint URL
27
- credential: Authentication credential
28
- namespace_topic: Topic name to consume from
29
- subscription: Subscription name to consume from
30
- api_version: API version (default: "2024-06-01")
31
"""
32
```
33
34
### Event Reception
35
36
Receives a batch of Cloud Events from the subscription with configurable batch size and wait time.
37
38
```python { .api }
39
def receive(
40
self,
41
*,
42
max_events: Optional[int] = None,
43
max_wait_time: Optional[int] = None,
44
**kwargs: Any
45
) -> List[ReceiveDetails]:
46
"""
47
Receive batch of Cloud Events from subscription.
48
49
Parameters:
50
- max_events: Maximum number of events to receive (1-100, service default applies)
51
- max_wait_time: Maximum wait time in seconds (1-300, service default applies)
52
53
Returns:
54
List[ReceiveDetails]: List of received events with broker properties
55
56
Raises:
57
- HttpResponseError: Server returned error response
58
- ClientAuthenticationError: Authentication failed
59
"""
60
```
61
62
### Event Acknowledgment
63
64
Acknowledges successfully processed events, removing them from the subscription.
65
66
```python { .api }
67
def acknowledge(
68
self,
69
*,
70
lock_tokens: List[str],
71
**kwargs: Any
72
) -> AcknowledgeResult:
73
"""
74
Acknowledge successfully processed events.
75
76
Parameters:
77
- lock_tokens: List of lock tokens from received events
78
79
Returns:
80
AcknowledgeResult: Result with succeeded and failed acknowledgments
81
82
Raises:
83
- HttpResponseError: Server returned error response
84
- ValueError: Invalid or expired lock tokens
85
"""
86
```
87
88
### Event Release
89
90
Releases events back to the subscription for reprocessing, optionally with a delay.
91
92
```python { .api }
93
def release(
94
self,
95
*,
96
lock_tokens: List[str],
97
release_delay: Optional[Union[int, ReleaseDelay]] = None,
98
**kwargs: Any
99
) -> ReleaseResult:
100
"""
101
Release events back to subscription for reprocessing.
102
103
Parameters:
104
- lock_tokens: List of lock tokens from received events
105
- release_delay: Delay before event becomes available (ReleaseDelay enum or seconds as int)
106
107
Returns:
108
ReleaseResult: Result with succeeded and failed releases
109
110
Raises:
111
- HttpResponseError: Server returned error response
112
- ValueError: Invalid lock tokens or release delay
113
"""
114
```
115
116
### Event Rejection
117
118
Rejects events that cannot be processed, typically moving them to dead letter storage.
119
120
```python { .api }
121
def reject(
122
self,
123
*,
124
lock_tokens: List[str],
125
**kwargs: Any
126
) -> RejectResult:
127
"""
128
Reject events that cannot be processed.
129
130
Parameters:
131
- lock_tokens: List of lock tokens from received events
132
133
Returns:
134
RejectResult: Result with succeeded and failed rejections
135
136
Raises:
137
- HttpResponseError: Server returned error response
138
- ValueError: Invalid or expired lock tokens
139
"""
140
```
141
142
### Lock Renewal
143
144
Extends the lock duration on events to continue processing beyond the initial lock timeout.
145
146
```python { .api }
147
def renew_locks(
148
self,
149
*,
150
lock_tokens: List[str],
151
**kwargs: Any
152
) -> RenewLocksResult:
153
"""
154
Renew locks on events to extend processing time.
155
156
Parameters:
157
- lock_tokens: List of lock tokens from received events
158
159
Returns:
160
RenewLocksResult: Result with succeeded and failed lock renewals
161
162
Raises:
163
- HttpResponseError: Server returned error response
164
- ValueError: Invalid or expired lock tokens
165
"""
166
```
167
168
### Low-Level HTTP Operations
169
170
Direct HTTP request handling for advanced scenarios.
171
172
```python { .api }
173
def send_request(
174
self,
175
request: HttpRequest,
176
*,
177
stream: bool = False,
178
**kwargs: Any
179
) -> HttpResponse:
180
"""
181
Send raw HTTP request through the client pipeline.
182
183
Parameters:
184
- request: The HTTP request to send
185
- stream: Whether to stream the response payload
186
187
Returns:
188
HttpResponse: Raw HTTP response
189
"""
190
```
191
192
### Resource Management
193
194
Context manager support and explicit resource cleanup.
195
196
```python { .api }
197
def close(self) -> None:
198
"""Close the client and cleanup resources."""
199
200
def __enter__(self) -> Self:
201
"""Context manager entry."""
202
203
def __exit__(self, *exc_details: Any) -> None:
204
"""Context manager exit with cleanup."""
205
```
206
207
## Usage Examples
208
209
### Basic Event Consumption
210
211
```python
212
from azure.eventgrid import EventGridConsumerClient
213
from azure.core.credentials import AzureKeyCredential
214
215
# Create consumer client
216
consumer = EventGridConsumerClient(
217
endpoint="https://my-namespace.westus-1.eventgrid.azure.net",
218
credential=AzureKeyCredential("access_key"),
219
namespace_topic="orders-topic",
220
subscription="order-processor"
221
)
222
223
# Receive events
224
events = consumer.receive(max_events=10, max_wait_time=60)
225
226
for event_detail in events:
227
cloud_event = event_detail.event
228
broker_props = event_detail.broker_properties
229
230
print(f"Event ID: {cloud_event.id}")
231
print(f"Event Type: {cloud_event.type}")
232
print(f"Event Source: {cloud_event.source}")
233
print(f"Event Data: {cloud_event.data}")
234
print(f"Lock Token: {broker_props.lock_token}")
235
print(f"Delivery Count: {broker_props.delivery_count}")
236
237
consumer.close()
238
```
239
240
### Event Processing with Acknowledgment
241
242
```python
243
# Process and acknowledge events
244
events = consumer.receive(max_events=5)
245
processed_tokens = []
246
failed_tokens = []
247
248
for event_detail in events:
249
try:
250
# Process the event
251
result = process_order_event(event_detail.event)
252
if result.success:
253
processed_tokens.append(event_detail.broker_properties.lock_token)
254
else:
255
failed_tokens.append(event_detail.broker_properties.lock_token)
256
except Exception as e:
257
print(f"Processing failed: {e}")
258
failed_tokens.append(event_detail.broker_properties.lock_token)
259
260
# Acknowledge successfully processed events
261
if processed_tokens:
262
ack_result = consumer.acknowledge(lock_tokens=processed_tokens)
263
print(f"Acknowledged: {len(ack_result.succeeded_lock_tokens)}")
264
print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")
265
266
# Release failed events for retry
267
if failed_tokens:
268
release_result = consumer.release(
269
lock_tokens=failed_tokens,
270
release_delay=ReleaseDelay.TEN_SECONDS
271
)
272
print(f"Released: {len(release_result.succeeded_lock_tokens)}")
273
```
274
275
### Error Handling and Retries
276
277
```python
278
from azure.eventgrid.models import ReleaseDelay
279
from azure.core.exceptions import HttpResponseError
280
281
events = consumer.receive(max_events=10)
282
283
for event_detail in events:
284
cloud_event = event_detail.event
285
lock_token = event_detail.broker_properties.lock_token
286
delivery_count = event_detail.broker_properties.delivery_count
287
288
try:
289
# Attempt processing
290
process_event(cloud_event)
291
292
# Acknowledge successful processing
293
consumer.acknowledge(lock_tokens=[lock_token])
294
295
except ProcessingError as e:
296
if delivery_count < 3:
297
# Retry with delay
298
consumer.release(
299
lock_tokens=[lock_token],
300
release_delay=ReleaseDelay.ONE_MINUTE
301
)
302
print(f"Released for retry (attempt {delivery_count + 1})")
303
else:
304
# Max retries exceeded, reject to dead letter
305
consumer.reject(lock_tokens=[lock_token])
306
print(f"Rejected after {delivery_count} attempts")
307
308
except Exception as e:
309
# Unexpected error, reject immediately
310
consumer.reject(lock_tokens=[lock_token])
311
print(f"Rejected due to unexpected error: {e}")
312
```
313
314
### Long-Running Processing with Lock Renewal
315
316
```python
317
import time
318
from azure.eventgrid.models import ReleaseDelay
319
320
events = consumer.receive(max_events=1)
321
322
for event_detail in events:
323
lock_token = event_detail.broker_properties.lock_token
324
325
try:
326
# Start long-running processing
327
for step in range(10): # Simulate 10 processing steps
328
# Renew lock every 30 seconds to prevent timeout
329
if step > 0 and step % 3 == 0:
330
renew_result = consumer.renew_locks(lock_tokens=[lock_token])
331
if not renew_result.succeeded_lock_tokens:
332
print("Failed to renew lock, releasing event")
333
consumer.release(lock_tokens=[lock_token])
334
break
335
336
# Process step (simulate 10 seconds per step)
337
time.sleep(10)
338
print(f"Completed step {step + 1}")
339
else:
340
# All steps completed successfully
341
consumer.acknowledge(lock_tokens=[lock_token])
342
print("Processing completed successfully")
343
344
except Exception as e:
345
# Processing failed, release for retry
346
consumer.release(
347
lock_tokens=[lock_token],
348
release_delay=ReleaseDelay.TEN_MINUTES
349
)
350
print(f"Processing failed, released for retry: {e}")
351
```
352
353
### Context Manager Usage
354
355
```python
356
# Automatic resource cleanup
357
with EventGridConsumerClient(
358
endpoint="https://namespace.region.eventgrid.azure.net",
359
credential=AzureKeyCredential("key"),
360
namespace_topic="topic",
361
subscription="sub"
362
) as consumer:
363
364
events = consumer.receive(max_events=5)
365
366
# Process events
367
for event_detail in events:
368
process_event(event_detail.event)
369
370
# Acknowledge all events
371
lock_tokens = [e.broker_properties.lock_token for e in events]
372
consumer.acknowledge(lock_tokens=lock_tokens)
373
374
# Client automatically closed on exit
375
```
376
377
### Batch Processing Pattern
378
379
```python
380
def process_event_batch(consumer, batch_size=10):
381
"""Process events in batches with proper error handling."""
382
383
events = consumer.receive(max_events=batch_size, max_wait_time=30)
384
385
if not events:
386
print("No events received")
387
return
388
389
print(f"Received {len(events)} events")
390
391
# Group events by processing outcome
392
success_tokens = []
393
retry_tokens = []
394
reject_tokens = []
395
396
for event_detail in events:
397
try:
398
# Process individual event
399
result = process_single_event(event_detail.event)
400
401
if result == "success":
402
success_tokens.append(event_detail.broker_properties.lock_token)
403
elif result == "retry":
404
retry_tokens.append(event_detail.broker_properties.lock_token)
405
else:
406
reject_tokens.append(event_detail.broker_properties.lock_token)
407
408
except Exception as e:
409
print(f"Unexpected error processing event: {e}")
410
reject_tokens.append(event_detail.broker_properties.lock_token)
411
412
# Handle each group of events
413
if success_tokens:
414
ack_result = consumer.acknowledge(lock_tokens=success_tokens)
415
print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")
416
417
if retry_tokens:
418
release_result = consumer.release(
419
lock_tokens=retry_tokens,
420
release_delay=ReleaseDelay.ONE_MINUTE
421
)
422
print(f"Released {len(release_result.succeeded_lock_tokens)} events for retry")
423
424
if reject_tokens:
425
reject_result = consumer.reject(lock_tokens=reject_tokens)
426
print(f"Rejected {len(reject_result.succeeded_lock_tokens)} events")
427
428
# Usage
429
with EventGridConsumerClient(...) as consumer:
430
while True:
431
process_event_batch(consumer)
432
time.sleep(5) # Brief pause between batches
433
```
434
435
## Error Handling
436
437
### Common Error Scenarios
438
439
```python
440
from azure.core.exceptions import HttpResponseError, ClientAuthenticationError
441
442
try:
443
events = consumer.receive(max_events=10)
444
# Process events...
445
consumer.acknowledge(lock_tokens=lock_tokens)
446
447
except ClientAuthenticationError as e:
448
print(f"Authentication failed: {e}")
449
# Check credentials and permissions
450
451
except HttpResponseError as e:
452
if e.status_code == 404:
453
print("Topic or subscription not found")
454
elif e.status_code == 400:
455
print(f"Bad request: {e.message}")
456
# Check lock tokens and parameters
457
elif e.status_code == 409:
458
print("Lock tokens expired or invalid")
459
# Events may have been processed by another consumer
460
else:
461
print(f"HTTP error {e.status_code}: {e.message}")
462
463
except ValueError as e:
464
print(f"Invalid parameters: {e}")
465
# Check lock tokens format and values
466
```
467
468
### Operation Result Handling
469
470
```python # Handle partial success in batch operations
471
ack_result = consumer.acknowledge(lock_tokens=all_lock_tokens)
472
473
# Check for successful acknowledgments
474
if ack_result.succeeded_lock_tokens:
475
print(f"Successfully acknowledged {len(ack_result.succeeded_lock_tokens)} events")
476
477
# Handle failed acknowledgments
478
if ack_result.failed_lock_tokens:
479
print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")
480
for failed_token in ack_result.failed_lock_tokens:
481
print(f"Failed token: {failed_token.lock_token}")
482
print(f"Error: {failed_token.error}")
483
484
# Decide whether to retry or reject based on error
485
if "expired" in str(failed_token.error):
486
print("Lock expired, event likely processed elsewhere")
487
else:
488
# Unexpected error, may need manual intervention
489
print("Unexpected acknowledgment error")
490
```