0
# Models and Data Types
1
2
Azure Event Grid Python SDK provides comprehensive models for event handling, response management, and broker metadata. These models encapsulate all the data structures needed for robust event processing in Event Grid Namespaces.
3
4
## Capabilities
5
6
### Event Reception Models
7
8
Models for received events and their associated broker metadata.
9
10
```python { .api }
11
class ReceiveDetails:
12
"""
13
Container for a received Cloud Event with broker metadata.
14
15
Attributes:
16
- broker_properties: BrokerProperties - Event broker metadata
17
- event: CloudEvent - The received Cloud Event
18
"""
19
broker_properties: BrokerProperties
20
event: CloudEvent
21
```
22
23
```python { .api }
24
class BrokerProperties:
25
"""
26
Broker metadata associated with a received event.
27
28
Attributes:
29
- lock_token: str - Unique token for event operations (acknowledge, release, etc.)
30
- delivery_count: int - Number of delivery attempts for this event
31
"""
32
lock_token: str
33
delivery_count: int
34
```
35
36
### Operation Result Models
37
38
Response models for consumer operations providing success/failure details.
39
40
```python { .api }
41
class AcknowledgeResult:
42
"""
43
Result of acknowledge operation.
44
45
Attributes:
46
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to acknowledge
47
- succeeded_lock_tokens: List[str] - Tokens that were successfully acknowledged
48
"""
49
failed_lock_tokens: List[FailedLockToken]
50
succeeded_lock_tokens: List[str]
51
```
52
53
```python { .api }
54
class ReleaseResult:
55
"""
56
Result of release operation.
57
58
Attributes:
59
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to release
60
- succeeded_lock_tokens: List[str] - Tokens that were successfully released
61
"""
62
failed_lock_tokens: List[FailedLockToken]
63
succeeded_lock_tokens: List[str]
64
```
65
66
```python { .api }
67
class RejectResult:
68
"""
69
Result of reject operation.
70
71
Attributes:
72
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to reject
73
- succeeded_lock_tokens: List[str] - Tokens that were successfully rejected
74
"""
75
failed_lock_tokens: List[FailedLockToken]
76
succeeded_lock_tokens: List[str]
77
```
78
79
```python { .api }
80
class RenewLocksResult:
81
"""
82
Result of renew locks operation.
83
84
Attributes:
85
- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to renew
86
- succeeded_lock_tokens: List[str] - Tokens that were successfully renewed
87
"""
88
failed_lock_tokens: List[FailedLockToken]
89
succeeded_lock_tokens: List[str]
90
```
91
92
### Error Models
93
94
Models for handling operation failures and error details.
95
96
```python { .api }
97
class FailedLockToken:
98
"""
99
Information about a failed lock token operation.
100
101
Attributes:
102
- lock_token: str - The lock token that failed
103
- error: ODataV4Format - Detailed error information
104
"""
105
lock_token: str
106
error: ODataV4Format
107
```
108
109
### Internal Service Models
110
111
Models used internally by the service for request/response handling.
112
113
```python { .api }
114
class ReceiveResult:
115
"""
116
Internal container for receive operation response.
117
118
Attributes:
119
- details: List[ReceiveDetails] - Array of received events with metadata
120
"""
121
details: List[ReceiveDetails]
122
```
123
124
```python { .api }
125
class PublishResult:
126
"""
127
Internal result for publish operations (typically empty).
128
"""
129
pass
130
```
131
132
```python { .api }
133
class CloudEvent:
134
"""
135
Internal CloudEvent representation following CloudEvents v1.0 specification.
136
137
Attributes:
138
- id: str - Event identifier
139
- source: str - Context in which event occurred
140
- type: str - Type of event
141
- specversion: str - CloudEvents specification version
142
- data: Any - Event payload data
143
- data_base64: bytes - Base64-encoded event data (alternative to data)
144
- time: datetime - Event timestamp
145
- dataschema: str - URI of the schema for the data
146
- datacontenttype: str - Content type of the data
147
- subject: str - Subject of the event in context of the event producer
148
"""
149
id: str
150
source: str
151
type: str
152
specversion: str
153
data: Any
154
data_base64: bytes
155
time: datetime
156
dataschema: str
157
datacontenttype: str
158
subject: str
159
```
160
161
### Enums
162
163
Predefined constants for common operations and delays.
164
165
```python { .api }
166
from enum import Enum
167
168
class ReleaseDelay(str, Enum):
169
"""
170
Predefined delay values for event release operations.
171
172
Values:
173
- NO_DELAY: "0" - Release immediately
174
- TEN_SECONDS: "10" - Release after 10 seconds
175
- ONE_MINUTE: "60" - Release after 60 seconds
176
- TEN_MINUTES: "600" - Release after 600 seconds (10 minutes)
177
- ONE_HOUR: "3600" - Release after 3600 seconds (1 hour)
178
"""
179
NO_DELAY = "0"
180
TEN_SECONDS = "10"
181
ONE_MINUTE = "60"
182
TEN_MINUTES = "600"
183
ONE_HOUR = "3600"
184
```
185
186
## Usage Examples
187
188
### Working with Received Events
189
190
```python
191
from azure.eventgrid import EventGridConsumerClient
192
from azure.core.credentials import AzureKeyCredential
193
194
consumer = EventGridConsumerClient(
195
endpoint="https://namespace.region.eventgrid.azure.net",
196
credential=AzureKeyCredential("key"),
197
namespace_topic="orders",
198
subscription="processor"
199
)
200
201
# Receive events
202
events = consumer.receive(max_events=5)
203
204
for event_detail in events:
205
# Access broker properties
206
broker_props = event_detail.broker_properties
207
print(f"Lock Token: {broker_props.lock_token}")
208
print(f"Delivery Count: {broker_props.delivery_count}")
209
210
# Access Cloud Event
211
cloud_event = event_detail.event
212
print(f"Event ID: {cloud_event.id}")
213
print(f"Event Type: {cloud_event.type}")
214
print(f"Event Source: {cloud_event.source}")
215
print(f"Event Data: {cloud_event.data}")
216
217
# Optional Cloud Event properties
218
if cloud_event.subject:
219
print(f"Subject: {cloud_event.subject}")
220
if cloud_event.time:
221
print(f"Event Time: {cloud_event.time}")
222
if cloud_event.datacontenttype:
223
print(f"Content Type: {cloud_event.datacontenttype}")
224
225
consumer.close()
226
```
227
228
### Handling Operation Results
229
230
```python
231
from azure.eventgrid.models import ReleaseDelay
232
233
# Collect lock tokens from received events
234
events = consumer.receive(max_events=10)
235
lock_tokens = [event.broker_properties.lock_token for event in events]
236
237
# Attempt to acknowledge events
238
ack_result = consumer.acknowledge(lock_tokens=lock_tokens)
239
240
# Check successful acknowledgments
241
print(f"Successfully acknowledged: {len(ack_result.succeeded_lock_tokens)}")
242
for token in ack_result.succeeded_lock_tokens:
243
print(f" Acknowledged: {token}")
244
245
# Handle failed acknowledgments
246
if ack_result.failed_lock_tokens:
247
print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")
248
249
retry_tokens = []
250
for failed_token in ack_result.failed_lock_tokens:
251
print(f" Failed token: {failed_token.lock_token}")
252
print(f" Error: {failed_token.error}")
253
254
# Check error type to decide on retry strategy
255
error_str = str(failed_token.error)
256
if "expired" in error_str.lower():
257
print(" Lock expired - event likely processed elsewhere")
258
elif "not found" in error_str.lower():
259
print(" Event not found - may have been deleted")
260
else:
261
print(" Unexpected error - retrying")
262
retry_tokens.append(failed_token.lock_token)
263
264
# Retry failed tokens with release
265
if retry_tokens:
266
release_result = consumer.release(
267
lock_tokens=retry_tokens,
268
release_delay=ReleaseDelay.ONE_MINUTE
269
)
270
print(f"Released for retry: {len(release_result.succeeded_lock_tokens)}")
271
```
272
273
### Using Release Delays
274
275
```python
276
from azure.eventgrid.models import ReleaseDelay
277
278
# Different release delay strategies
279
events = consumer.receive(max_events=5)
280
281
for event_detail in events:
282
lock_token = event_detail.broker_properties.lock_token
283
delivery_count = event_detail.broker_properties.delivery_count
284
285
try:
286
# Attempt processing
287
process_event(event_detail.event)
288
consumer.acknowledge(lock_tokens=[lock_token])
289
290
except TransientError:
291
# Transient error - retry quickly
292
consumer.release(
293
lock_tokens=[lock_token],
294
release_delay=ReleaseDelay.TEN_SECONDS
295
)
296
297
except RateLimitError:
298
# Rate limited - wait longer
299
consumer.release(
300
lock_tokens=[lock_token],
301
release_delay=ReleaseDelay.TEN_MINUTES
302
)
303
304
except ProcessingError:
305
if delivery_count < 3:
306
# Retry with exponential backoff
307
delays = [ReleaseDelay.ONE_MINUTE, ReleaseDelay.TEN_MINUTES, ReleaseDelay.ONE_HOUR]
308
delay = delays[min(delivery_count - 1, len(delays) - 1)]
309
310
consumer.release(
311
lock_tokens=[lock_token],
312
release_delay=delay
313
)
314
else:
315
# Max retries exceeded - reject
316
consumer.reject(lock_tokens=[lock_token])
317
318
except Exception:
319
# Unexpected error - reject immediately
320
consumer.reject(lock_tokens=[lock_token])
321
```
322
323
### Batch Result Processing
324
325
```python
326
def process_batch_results(consumer, events):
327
"""Process a batch of events and handle all results."""
328
329
# Group events by processing outcome
330
success_tokens = []
331
transient_failure_tokens = []
332
permanent_failure_tokens = []
333
334
for event_detail in events:
335
try:
336
result = process_event(event_detail.event)
337
338
if result.success:
339
success_tokens.append(event_detail.broker_properties.lock_token)
340
elif result.transient_error:
341
transient_failure_tokens.append(event_detail.broker_properties.lock_token)
342
else:
343
permanent_failure_tokens.append(event_detail.broker_properties.lock_token)
344
345
except Exception:
346
permanent_failure_tokens.append(event_detail.broker_properties.lock_token)
347
348
# Execute operations in parallel
349
operations = []
350
351
if success_tokens:
352
operations.append(('acknowledge', consumer.acknowledge(lock_tokens=success_tokens)))
353
354
if transient_failure_tokens:
355
operations.append(('release', consumer.release(
356
lock_tokens=transient_failure_tokens,
357
release_delay=ReleaseDelay.ONE_MINUTE
358
)))
359
360
if permanent_failure_tokens:
361
operations.append(('reject', consumer.reject(lock_tokens=permanent_failure_tokens)))
362
363
# Process all operation results
364
for operation_name, result in operations:
365
print(f"{operation_name.title()} Results:")
366
print(f" Succeeded: {len(result.succeeded_lock_tokens)}")
367
368
if result.failed_lock_tokens:
369
print(f" Failed: {len(result.failed_lock_tokens)}")
370
for failed_token in result.failed_lock_tokens:
371
print(f" Token: {failed_token.lock_token}")
372
print(f" Error: {failed_token.error}")
373
374
# Usage
375
events = consumer.receive(max_events=20)
376
if events:
377
process_batch_results(consumer, events)
378
```
379
380
### Event Metadata Analysis
381
382
```python
383
from datetime import datetime, timezone
384
from collections import defaultdict
385
386
def analyze_event_batch(events):
387
"""Analyze received events for monitoring and debugging."""
388
389
# Group events by various attributes
390
by_type = defaultdict(int)
391
by_source = defaultdict(int)
392
by_delivery_count = defaultdict(int)
393
394
oldest_event = None
395
newest_event = None
396
397
for event_detail in events:
398
cloud_event = event_detail.event
399
broker_props = event_detail.broker_properties
400
401
# Count by type and source
402
by_type[cloud_event.type] += 1
403
by_source[cloud_event.source] += 1
404
by_delivery_count[broker_props.delivery_count] += 1
405
406
# Track event timing
407
if cloud_event.time:
408
if oldest_event is None or cloud_event.time < oldest_event:
409
oldest_event = cloud_event.time
410
if newest_event is None or cloud_event.time > newest_event:
411
newest_event = cloud_event.time
412
413
# Print analysis
414
print(f"Batch Analysis - {len(events)} events:")
415
print(f"Event Types: {dict(by_type)}")
416
print(f"Event Sources: {dict(by_source)}")
417
print(f"Delivery Counts: {dict(by_delivery_count)}")
418
419
if oldest_event and newest_event:
420
age_span = newest_event - oldest_event
421
print(f"Time Span: {age_span.total_seconds():.1f} seconds")
422
423
# Check for old events
424
now = datetime.now(timezone.utc)
425
oldest_age = (now - oldest_event).total_seconds()
426
if oldest_age > 3600: # Older than 1 hour
427
print(f"WARNING: Oldest event is {oldest_age/3600:.1f} hours old")
428
429
# Check for retry patterns
430
high_retry_events = [e for e in events if e.broker_properties.delivery_count > 2]
431
if high_retry_events:
432
print(f"High retry events: {len(high_retry_events)}")
433
for event_detail in high_retry_events:
434
print(f" {event_detail.event.type}: {event_detail.broker_properties.delivery_count} attempts")
435
436
# Usage
437
events = consumer.receive(max_events=50)
438
if events:
439
analyze_event_batch(events)
440
```
441
442
### Custom Result Processing
443
444
```python
445
class EventProcessor:
446
"""Custom event processor with detailed result tracking."""
447
448
def __init__(self, consumer):
449
self.consumer = consumer
450
self.stats = {
451
'processed': 0,
452
'acknowledged': 0,
453
'released': 0,
454
'rejected': 0,
455
'errors': 0
456
}
457
458
def process_batch(self, max_events=10):
459
"""Process a batch of events with detailed tracking."""
460
461
events = self.consumer.receive(max_events=max_events)
462
if not events:
463
return
464
465
self.stats['processed'] += len(events)
466
467
# Process events
468
results = []
469
for event_detail in events:
470
try:
471
success = self.process_single_event(event_detail.event)
472
results.append((event_detail.broker_properties.lock_token, success))
473
except Exception as e:
474
print(f"Processing error: {e}")
475
results.append((event_detail.broker_properties.lock_token, False))
476
self.stats['errors'] += 1
477
478
# Group by outcome
479
success_tokens = [token for token, success in results if success]
480
failure_tokens = [token for token, success in results if not success]
481
482
# Execute operations and track results
483
if success_tokens:
484
ack_result = self.consumer.acknowledge(lock_tokens=success_tokens)
485
self.stats['acknowledged'] += len(ack_result.succeeded_lock_tokens)
486
487
# Handle partial failures
488
if ack_result.failed_lock_tokens:
489
print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")
490
failure_tokens.extend([ft.lock_token for ft in ack_result.failed_lock_tokens])
491
492
if failure_tokens:
493
release_result = self.consumer.release(
494
lock_tokens=failure_tokens,
495
release_delay=ReleaseDelay.ONE_MINUTE
496
)
497
self.stats['released'] += len(release_result.succeeded_lock_tokens)
498
499
# Handle release failures (reject as last resort)
500
if release_result.failed_lock_tokens:
501
reject_tokens = [ft.lock_token for ft in release_result.failed_lock_tokens]
502
reject_result = self.consumer.reject(lock_tokens=reject_tokens)
503
self.stats['rejected'] += len(reject_result.succeeded_lock_tokens)
504
505
def process_single_event(self, cloud_event):
506
"""Process a single Cloud Event."""
507
# Implement your processing logic here
508
print(f"Processing {cloud_event.type} from {cloud_event.source}")
509
return True # Return success/failure
510
511
def print_stats(self):
512
"""Print processing statistics."""
513
print("Processing Statistics:")
514
for key, value in self.stats.items():
515
print(f" {key.title()}: {value}")
516
517
# Usage
518
processor = EventProcessor(consumer)
519
520
# Process multiple batches
521
for _ in range(10):
522
processor.process_batch(max_events=5)
523
524
processor.print_stats()
525
```