or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdconsumer.mdindex.mdlegacy.mdmodels.mdpublisher.md

models.mddocs/

0

# Models and Data Types

1

2

Azure Event Grid Python SDK provides comprehensive models for event handling, response management, and broker metadata. These models encapsulate all the data structures needed for robust event processing in Event Grid Namespaces.

3

4

## Capabilities

5

6

### Event Reception Models

7

8

Models for received events and their associated broker metadata.

9

10

```python { .api }

11

class ReceiveDetails:

12

"""

13

Container for a received Cloud Event with broker metadata.

14

15

Attributes:

16

- broker_properties: BrokerProperties - Event broker metadata

17

- event: CloudEvent - The received Cloud Event

18

"""

19

broker_properties: BrokerProperties

20

event: CloudEvent

21

```

22

23

```python { .api }

24

class BrokerProperties:

25

"""

26

Broker metadata associated with a received event.

27

28

Attributes:

29

- lock_token: str - Unique token for event operations (acknowledge, release, etc.)

30

- delivery_count: int - Number of delivery attempts for this event

31

"""

32

lock_token: str

33

delivery_count: int

34

```

35

36

### Operation Result Models

37

38

Response models for consumer operations providing success/failure details.

39

40

```python { .api }

41

class AcknowledgeResult:

42

"""

43

Result of acknowledge operation.

44

45

Attributes:

46

- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to acknowledge

47

- succeeded_lock_tokens: List[str] - Tokens that were successfully acknowledged

48

"""

49

failed_lock_tokens: List[FailedLockToken]

50

succeeded_lock_tokens: List[str]

51

```

52

53

```python { .api }

54

class ReleaseResult:

55

"""

56

Result of release operation.

57

58

Attributes:

59

- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to release

60

- succeeded_lock_tokens: List[str] - Tokens that were successfully released

61

"""

62

failed_lock_tokens: List[FailedLockToken]

63

succeeded_lock_tokens: List[str]

64

```

65

66

```python { .api }

67

class RejectResult:

68

"""

69

Result of reject operation.

70

71

Attributes:

72

- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to reject

73

- succeeded_lock_tokens: List[str] - Tokens that were successfully rejected

74

"""

75

failed_lock_tokens: List[FailedLockToken]

76

succeeded_lock_tokens: List[str]

77

```

78

79

```python { .api }

80

class RenewLocksResult:

81

"""

82

Result of renew locks operation.

83

84

Attributes:

85

- failed_lock_tokens: List[FailedLockToken] - Tokens that failed to renew

86

- succeeded_lock_tokens: List[str] - Tokens that were successfully renewed

87

"""

88

failed_lock_tokens: List[FailedLockToken]

89

succeeded_lock_tokens: List[str]

90

```

91

92

### Error Models

93

94

Models for handling operation failures and error details.

95

96

```python { .api }

97

class FailedLockToken:

98

"""

99

Information about a failed lock token operation.

100

101

Attributes:

102

- lock_token: str - The lock token that failed

103

- error: ODataV4Format - Detailed error information

104

"""

105

lock_token: str

106

error: ODataV4Format

107

```

108

109

### Internal Service Models

110

111

Models used internally by the service for request/response handling.

112

113

```python { .api }

114

class ReceiveResult:

115

"""

116

Internal container for receive operation response.

117

118

Attributes:

119

- details: List[ReceiveDetails] - Array of received events with metadata

120

"""

121

details: List[ReceiveDetails]

122

```

123

124

```python { .api }

125

class PublishResult:

126

"""

127

Internal result for publish operations (typically empty).

128

"""

129

pass

130

```

131

132

```python { .api }

133

class CloudEvent:

134

"""

135

Internal CloudEvent representation following CloudEvents v1.0 specification.

136

137

Attributes:

138

- id: str - Event identifier

139

- source: str - Context in which event occurred

140

- type: str - Type of event

141

- specversion: str - CloudEvents specification version

142

- data: Any - Event payload data

143

- data_base64: bytes - Base64-encoded event data (alternative to data)

144

- time: datetime - Event timestamp

145

- dataschema: str - URI of the schema for the data

146

- datacontenttype: str - Content type of the data

147

- subject: str - Subject of the event in context of the event producer

148

"""

149

id: str

150

source: str

151

type: str

152

specversion: str

153

data: Any

154

data_base64: bytes

155

time: datetime

156

dataschema: str

157

datacontenttype: str

158

subject: str

159

```

160

161

### Enums

162

163

Predefined constants for common operations and delays.

164

165

```python { .api }

166

from enum import Enum

167

168

class ReleaseDelay(str, Enum):

169

"""

170

Predefined delay values for event release operations.

171

172

Values:

173

- NO_DELAY: "0" - Release immediately

174

- TEN_SECONDS: "10" - Release after 10 seconds

175

- ONE_MINUTE: "60" - Release after 60 seconds

176

- TEN_MINUTES: "600" - Release after 600 seconds (10 minutes)

177

- ONE_HOUR: "3600" - Release after 3600 seconds (1 hour)

178

"""

179

NO_DELAY = "0"

180

TEN_SECONDS = "10"

181

ONE_MINUTE = "60"

182

TEN_MINUTES = "600"

183

ONE_HOUR = "3600"

184

```

185

186

## Usage Examples

187

188

### Working with Received Events

189

190

```python

191

from azure.eventgrid import EventGridConsumerClient

192

from azure.core.credentials import AzureKeyCredential

193

194

consumer = EventGridConsumerClient(

195

endpoint="https://namespace.region.eventgrid.azure.net",

196

credential=AzureKeyCredential("key"),

197

namespace_topic="orders",

198

subscription="processor"

199

)

200

201

# Receive events

202

events = consumer.receive(max_events=5)

203

204

for event_detail in events:

205

# Access broker properties

206

broker_props = event_detail.broker_properties

207

print(f"Lock Token: {broker_props.lock_token}")

208

print(f"Delivery Count: {broker_props.delivery_count}")

209

210

# Access Cloud Event

211

cloud_event = event_detail.event

212

print(f"Event ID: {cloud_event.id}")

213

print(f"Event Type: {cloud_event.type}")

214

print(f"Event Source: {cloud_event.source}")

215

print(f"Event Data: {cloud_event.data}")

216

217

# Optional Cloud Event properties

218

if cloud_event.subject:

219

print(f"Subject: {cloud_event.subject}")

220

if cloud_event.time:

221

print(f"Event Time: {cloud_event.time}")

222

if cloud_event.datacontenttype:

223

print(f"Content Type: {cloud_event.datacontenttype}")

224

225

consumer.close()

226

```

227

228

### Handling Operation Results

229

230

```python

231

from azure.eventgrid.models import ReleaseDelay

232

233

# Collect lock tokens from received events

234

events = consumer.receive(max_events=10)

235

lock_tokens = [event.broker_properties.lock_token for event in events]

236

237

# Attempt to acknowledge events

238

ack_result = consumer.acknowledge(lock_tokens=lock_tokens)

239

240

# Check successful acknowledgments

241

print(f"Successfully acknowledged: {len(ack_result.succeeded_lock_tokens)}")

242

for token in ack_result.succeeded_lock_tokens:

243

print(f" Acknowledged: {token}")

244

245

# Handle failed acknowledgments

246

if ack_result.failed_lock_tokens:

247

print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")

248

249

retry_tokens = []

250

for failed_token in ack_result.failed_lock_tokens:

251

print(f" Failed token: {failed_token.lock_token}")

252

print(f" Error: {failed_token.error}")

253

254

# Check error type to decide on retry strategy

255

error_str = str(failed_token.error)

256

if "expired" in error_str.lower():

257

print(" Lock expired - event likely processed elsewhere")

258

elif "not found" in error_str.lower():

259

print(" Event not found - may have been deleted")

260

else:

261

print(" Unexpected error - retrying")

262

retry_tokens.append(failed_token.lock_token)

263

264

# Retry failed tokens with release

265

if retry_tokens:

266

release_result = consumer.release(

267

lock_tokens=retry_tokens,

268

release_delay=ReleaseDelay.ONE_MINUTE

269

)

270

print(f"Released for retry: {len(release_result.succeeded_lock_tokens)}")

271

```

272

273

### Using Release Delays

274

275

```python

276

from azure.eventgrid.models import ReleaseDelay

277

278

# Different release delay strategies

279

events = consumer.receive(max_events=5)

280

281

for event_detail in events:

282

lock_token = event_detail.broker_properties.lock_token

283

delivery_count = event_detail.broker_properties.delivery_count

284

285

try:

286

# Attempt processing

287

process_event(event_detail.event)

288

consumer.acknowledge(lock_tokens=[lock_token])

289

290

except TransientError:

291

# Transient error - retry quickly

292

consumer.release(

293

lock_tokens=[lock_token],

294

release_delay=ReleaseDelay.TEN_SECONDS

295

)

296

297

except RateLimitError:

298

# Rate limited - wait longer

299

consumer.release(

300

lock_tokens=[lock_token],

301

release_delay=ReleaseDelay.TEN_MINUTES

302

)

303

304

except ProcessingError:

305

if delivery_count < 3:

306

# Retry with exponential backoff

307

delays = [ReleaseDelay.ONE_MINUTE, ReleaseDelay.TEN_MINUTES, ReleaseDelay.ONE_HOUR]

308

delay = delays[min(delivery_count - 1, len(delays) - 1)]

309

310

consumer.release(

311

lock_tokens=[lock_token],

312

release_delay=delay

313

)

314

else:

315

# Max retries exceeded - reject

316

consumer.reject(lock_tokens=[lock_token])

317

318

except Exception:

319

# Unexpected error - reject immediately

320

consumer.reject(lock_tokens=[lock_token])

321

```

322

323

### Batch Result Processing

324

325

```python

326

def process_batch_results(consumer, events):

327

"""Process a batch of events and handle all results."""

328

329

# Group events by processing outcome

330

success_tokens = []

331

transient_failure_tokens = []

332

permanent_failure_tokens = []

333

334

for event_detail in events:

335

try:

336

result = process_event(event_detail.event)

337

338

if result.success:

339

success_tokens.append(event_detail.broker_properties.lock_token)

340

elif result.transient_error:

341

transient_failure_tokens.append(event_detail.broker_properties.lock_token)

342

else:

343

permanent_failure_tokens.append(event_detail.broker_properties.lock_token)

344

345

except Exception:

346

permanent_failure_tokens.append(event_detail.broker_properties.lock_token)

347

348

# Execute operations in parallel

349

operations = []

350

351

if success_tokens:

352

operations.append(('acknowledge', consumer.acknowledge(lock_tokens=success_tokens)))

353

354

if transient_failure_tokens:

355

operations.append(('release', consumer.release(

356

lock_tokens=transient_failure_tokens,

357

release_delay=ReleaseDelay.ONE_MINUTE

358

)))

359

360

if permanent_failure_tokens:

361

operations.append(('reject', consumer.reject(lock_tokens=permanent_failure_tokens)))

362

363

# Process all operation results

364

for operation_name, result in operations:

365

print(f"{operation_name.title()} Results:")

366

print(f" Succeeded: {len(result.succeeded_lock_tokens)}")

367

368

if result.failed_lock_tokens:

369

print(f" Failed: {len(result.failed_lock_tokens)}")

370

for failed_token in result.failed_lock_tokens:

371

print(f" Token: {failed_token.lock_token}")

372

print(f" Error: {failed_token.error}")

373

374

# Usage

375

events = consumer.receive(max_events=20)

376

if events:

377

process_batch_results(consumer, events)

378

```

379

380

### Event Metadata Analysis

381

382

```python

383

from datetime import datetime, timezone

384

from collections import defaultdict

385

386

def analyze_event_batch(events):

387

"""Analyze received events for monitoring and debugging."""

388

389

# Group events by various attributes

390

by_type = defaultdict(int)

391

by_source = defaultdict(int)

392

by_delivery_count = defaultdict(int)

393

394

oldest_event = None

395

newest_event = None

396

397

for event_detail in events:

398

cloud_event = event_detail.event

399

broker_props = event_detail.broker_properties

400

401

# Count by type and source

402

by_type[cloud_event.type] += 1

403

by_source[cloud_event.source] += 1

404

by_delivery_count[broker_props.delivery_count] += 1

405

406

# Track event timing

407

if cloud_event.time:

408

if oldest_event is None or cloud_event.time < oldest_event:

409

oldest_event = cloud_event.time

410

if newest_event is None or cloud_event.time > newest_event:

411

newest_event = cloud_event.time

412

413

# Print analysis

414

print(f"Batch Analysis - {len(events)} events:")

415

print(f"Event Types: {dict(by_type)}")

416

print(f"Event Sources: {dict(by_source)}")

417

print(f"Delivery Counts: {dict(by_delivery_count)}")

418

419

if oldest_event and newest_event:

420

age_span = newest_event - oldest_event

421

print(f"Time Span: {age_span.total_seconds():.1f} seconds")

422

423

# Check for old events

424

now = datetime.now(timezone.utc)

425

oldest_age = (now - oldest_event).total_seconds()

426

if oldest_age > 3600: # Older than 1 hour

427

print(f"WARNING: Oldest event is {oldest_age/3600:.1f} hours old")

428

429

# Check for retry patterns

430

high_retry_events = [e for e in events if e.broker_properties.delivery_count > 2]

431

if high_retry_events:

432

print(f"High retry events: {len(high_retry_events)}")

433

for event_detail in high_retry_events:

434

print(f" {event_detail.event.type}: {event_detail.broker_properties.delivery_count} attempts")

435

436

# Usage

437

events = consumer.receive(max_events=50)

438

if events:

439

analyze_event_batch(events)

440

```

441

442

### Custom Result Processing

443

444

```python

445

class EventProcessor:

446

"""Custom event processor with detailed result tracking."""

447

448

def __init__(self, consumer):

449

self.consumer = consumer

450

self.stats = {

451

'processed': 0,

452

'acknowledged': 0,

453

'released': 0,

454

'rejected': 0,

455

'errors': 0

456

}

457

458

def process_batch(self, max_events=10):

459

"""Process a batch of events with detailed tracking."""

460

461

events = self.consumer.receive(max_events=max_events)

462

if not events:

463

return

464

465

self.stats['processed'] += len(events)

466

467

# Process events

468

results = []

469

for event_detail in events:

470

try:

471

success = self.process_single_event(event_detail.event)

472

results.append((event_detail.broker_properties.lock_token, success))

473

except Exception as e:

474

print(f"Processing error: {e}")

475

results.append((event_detail.broker_properties.lock_token, False))

476

self.stats['errors'] += 1

477

478

# Group by outcome

479

success_tokens = [token for token, success in results if success]

480

failure_tokens = [token for token, success in results if not success]

481

482

# Execute operations and track results

483

if success_tokens:

484

ack_result = self.consumer.acknowledge(lock_tokens=success_tokens)

485

self.stats['acknowledged'] += len(ack_result.succeeded_lock_tokens)

486

487

# Handle partial failures

488

if ack_result.failed_lock_tokens:

489

print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")

490

failure_tokens.extend([ft.lock_token for ft in ack_result.failed_lock_tokens])

491

492

if failure_tokens:

493

release_result = self.consumer.release(

494

lock_tokens=failure_tokens,

495

release_delay=ReleaseDelay.ONE_MINUTE

496

)

497

self.stats['released'] += len(release_result.succeeded_lock_tokens)

498

499

# Handle release failures (reject as last resort)

500

if release_result.failed_lock_tokens:

501

reject_tokens = [ft.lock_token for ft in release_result.failed_lock_tokens]

502

reject_result = self.consumer.reject(lock_tokens=reject_tokens)

503

self.stats['rejected'] += len(reject_result.succeeded_lock_tokens)

504

505

def process_single_event(self, cloud_event):

506

"""Process a single Cloud Event."""

507

# Implement your processing logic here

508

print(f"Processing {cloud_event.type} from {cloud_event.source}")

509

return True # Return success/failure

510

511

def print_stats(self):

512

"""Print processing statistics."""

513

print("Processing Statistics:")

514

for key, value in self.stats.items():

515

print(f" {key.title()}: {value}")

516

517

# Usage

518

processor = EventProcessor(consumer)

519

520

# Process multiple batches

521

for _ in range(10):

522

processor.process_batch(max_events=5)

523

524

processor.print_stats()

525

```