or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconsumer.mderrors.mdindex.mdproducer.mdstructures.md

errors.mddocs/

0

# Error Handling

1

2

Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.

3

4

## Capabilities

5

6

### Base Exception Classes

7

8

Foundation exception classes providing error categorization and retry logic.

9

10

```python { .api }

11

class KafkaError(Exception):

12

"""

13

Base exception for all Kafka-related errors.

14

15

Attributes:

16

- retriable: bool, whether error is retriable

17

- invalid_metadata: bool, whether error invalidates metadata

18

"""

19

retriable = False

20

invalid_metadata = False

21

22

def __init__(self, *args):

23

super(KafkaError, self).__init__(*args)

24

25

class BrokerResponseError(KafkaError):

26

"""

27

Base class for errors returned by Kafka brokers.

28

29

Attributes:

30

- errno: int, Kafka error code

31

- message: str, error message

32

- description: str, detailed error description

33

"""

34

errno = None

35

message = None

36

description = None

37

38

class AuthorizationError(KafkaError):

39

"""Base class for authorization-related errors."""

40

pass

41

```

42

43

### Client-Side Errors

44

45

Errors originating from the client library itself, typically related to configuration, connection, or protocol issues.

46

47

```python { .api }

48

class KafkaConfigurationError(KafkaError):

49

"""Configuration parameter errors."""

50

pass

51

52

class KafkaConnectionError(KafkaError):

53

"""

54

Connection-related errors.

55

56

Attributes:

57

- retriable = True

58

"""

59

retriable = True

60

61

class KafkaProtocolError(KafkaError):

62

"""

63

Protocol-related errors.

64

65

Attributes:

66

- retriable = True

67

"""

68

retriable = True

69

70

class KafkaTimeoutError(KafkaError):

71

"""

72

Request timeout errors.

73

74

Attributes:

75

- retriable = True

76

"""

77

retriable = True

78

79

class IllegalArgumentError(KafkaError):

80

"""Invalid argument errors."""

81

pass

82

83

class IllegalStateError(KafkaError):

84

"""Invalid state errors."""

85

pass

86

87

class IncompatibleBrokerVersion(KafkaError):

88

"""Broker version compatibility errors."""

89

pass

90

91

class MetadataEmptyBrokerList(KafkaError):

92

"""

93

No brokers available for metadata.

94

95

Attributes:

96

- retriable = True

97

- invalid_metadata = True

98

"""

99

retriable = True

100

invalid_metadata = True

101

102

class NoBrokersAvailable(KafkaError):

103

"""

104

No brokers reachable.

105

106

Attributes:

107

- retriable = True

108

"""

109

retriable = True

110

111

class NoOffsetForPartitionError(KafkaError):

112

"""No offset available for partition."""

113

pass

114

115

class NodeNotReadyError(KafkaError):

116

"""

117

Node not ready for requests.

118

119

Attributes:

120

- retriable = True

121

"""

122

retriable = True

123

124

class QuotaViolationError(KafkaError):

125

"""Rate limit exceeded."""

126

pass

127

128

class StaleMetadata(KafkaError):

129

"""

130

Metadata needs refresh.

131

132

Attributes:

133

- retriable = True

134

- invalid_metadata = True

135

"""

136

retriable = True

137

invalid_metadata = True

138

139

class TooManyInFlightRequests(KafkaError):

140

"""

141

Request queue full.

142

143

Attributes:

144

- retriable = True

145

"""

146

retriable = True

147

148

class UnrecognizedBrokerVersion(KafkaError):

149

"""Unknown broker version."""

150

pass

151

152

class UnsupportedCodecError(KafkaError):

153

"""Unsupported compression codec."""

154

pass

155

```

156

157

### Authorization Errors

158

159

Errors related to access control and authentication failures.

160

161

```python { .api }

162

class TopicAuthorizationFailedError(AuthorizationError):

163

"""

164

Topic access denied.

165

166

Attributes:

167

- errno = 29

168

- message = 'TOPIC_AUTHORIZATION_FAILED'

169

- description = 'Not authorized to access topics'

170

"""

171

errno = 29

172

message = 'TOPIC_AUTHORIZATION_FAILED'

173

description = 'Not authorized to access topics'

174

175

class GroupAuthorizationFailedError(AuthorizationError):

176

"""

177

Consumer group access denied.

178

179

Attributes:

180

- errno = 30

181

- message = 'GROUP_AUTHORIZATION_FAILED'

182

- description = 'Not authorized to access group'

183

"""

184

errno = 30

185

message = 'GROUP_AUTHORIZATION_FAILED'

186

description = 'Not authorized to access group'

187

188

class ClusterAuthorizationFailedError(AuthorizationError):

189

"""

190

Cluster access denied.

191

192

Attributes:

193

- errno = 31

194

- message = 'CLUSTER_AUTHORIZATION_FAILED'

195

- description = 'Cluster authorization failed'

196

"""

197

errno = 31

198

message = 'CLUSTER_AUTHORIZATION_FAILED'

199

description = 'Cluster authorization failed'

200

201

class TransactionalIdAuthorizationFailedError(AuthorizationError):

202

"""

203

Transactional ID access denied.

204

205

Attributes:

206

- errno = 53

207

- message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'

208

- description = 'The transactional id authorization failed'

209

"""

210

errno = 53

211

message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'

212

description = 'The transactional id authorization failed'

213

214

class DelegationTokenAuthorizationFailedError(AuthorizationError):

215

"""

216

Delegation token access denied.

217

218

Attributes:

219

- errno = 58

220

- message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'

221

- description = 'Delegation Token authorization failed'

222

"""

223

errno = 58

224

message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'

225

description = 'Delegation Token authorization failed'

226

```

227

228

### Common Broker Response Errors

229

230

Most frequently encountered broker-side errors with their error codes and retry semantics.

231

232

```python { .api }

233

class OffsetOutOfRangeError(BrokerResponseError):

234

"""

235

Requested offset is out of range.

236

237

Attributes:

238

- errno = 1

239

- message = 'OFFSET_OUT_OF_RANGE'

240

- description = 'The requested offset is not within the range of offsets'

241

"""

242

errno = 1

243

message = 'OFFSET_OUT_OF_RANGE'

244

description = 'The requested offset is not within the range of offsets'

245

246

class UnknownTopicOrPartitionError(BrokerResponseError):

247

"""

248

Topic or partition does not exist.

249

250

Attributes:

251

- errno = 3

252

- message = 'UNKNOWN_TOPIC_OR_PARTITION'

253

- description = 'This server does not host this topic-partition'

254

- retriable = True

255

- invalid_metadata = True

256

"""

257

errno = 3

258

message = 'UNKNOWN_TOPIC_OR_PARTITION'

259

description = 'This server does not host this topic-partition'

260

retriable = True

261

invalid_metadata = True

262

263

class LeaderNotAvailableError(BrokerResponseError):

264

"""

265

Partition leader not available.

266

267

Attributes:

268

- errno = 5

269

- message = 'LEADER_NOT_AVAILABLE'

270

- description = 'There is no leader for this topic-partition'

271

- retriable = True

272

- invalid_metadata = True

273

"""

274

errno = 5

275

message = 'LEADER_NOT_AVAILABLE'

276

description = 'There is no leader for this topic-partition'

277

retriable = True

278

invalid_metadata = True

279

280

class NotLeaderForPartitionError(BrokerResponseError):

281

"""

282

Broker is not the leader for partition.

283

284

Attributes:

285

- errno = 6

286

- message = 'NOT_LEADER_FOR_PARTITION'

287

- description = 'This server is not the leader for that topic-partition'

288

- retriable = True

289

- invalid_metadata = True

290

"""

291

errno = 6

292

message = 'NOT_LEADER_FOR_PARTITION'

293

description = 'This server is not the leader for that topic-partition'

294

retriable = True

295

invalid_metadata = True

296

297

class RequestTimedOutError(BrokerResponseError):

298

"""

299

Request timed out.

300

301

Attributes:

302

- errno = 7

303

- message = 'REQUEST_TIMED_OUT'

304

- description = 'The request timed out'

305

- retriable = True

306

"""

307

errno = 7

308

message = 'REQUEST_TIMED_OUT'

309

description = 'The request timed out'

310

retriable = True

311

312

class BrokerNotAvailableError(BrokerResponseError):

313

"""

314

Broker not available.

315

316

Attributes:

317

- errno = 8

318

- message = 'BROKER_NOT_AVAILABLE'

319

- description = 'The broker is not available'

320

- retriable = True

321

- invalid_metadata = True

322

"""

323

errno = 8

324

message = 'BROKER_NOT_AVAILABLE'

325

description = 'The broker is not available'

326

retriable = True

327

invalid_metadata = True

328

329

class ReplicaNotAvailableError(BrokerResponseError):

330

"""

331

Replica not available.

332

333

Attributes:

334

- errno = 9

335

- message = 'REPLICA_NOT_AVAILABLE'

336

- description = 'The replica is not available for the requested topic-partition'

337

- retriable = True

338

"""

339

errno = 9

340

message = 'REPLICA_NOT_AVAILABLE'

341

description = 'The replica is not available for the requested topic-partition'

342

retriable = True

343

344

class MessageSizeTooLargeError(BrokerResponseError):

345

"""

346

Message size exceeds limits.

347

348

Attributes:

349

- errno = 10

350

- message = 'MESSAGE_TOO_LARGE'

351

- description = 'The request included a message larger than the max message size'

352

"""

353

errno = 10

354

message = 'MESSAGE_TOO_LARGE'

355

description = 'The request included a message larger than the max message size'

356

357

class TopicAlreadyExistsError(BrokerResponseError):

358

"""

359

Topic already exists.

360

361

Attributes:

362

- errno = 36

363

- message = 'TOPIC_ALREADY_EXISTS'

364

- description = 'Topic already exists'

365

"""

366

errno = 36

367

message = 'TOPIC_ALREADY_EXISTS'

368

description = 'Topic already exists'

369

370

class InvalidTopicError(BrokerResponseError):

371

"""

372

Invalid topic name.

373

374

Attributes:

375

- errno = 17

376

- message = 'INVALID_TOPIC_EXCEPTION'

377

- description = 'The request attempted to perform an operation on an invalid topic'

378

"""

379

errno = 17

380

message = 'INVALID_TOPIC_EXCEPTION'

381

description = 'The request attempted to perform an operation on an invalid topic'

382

```

383

384

### Consumer-Specific Errors

385

386

Errors specific to consumer operations and group coordination.

387

388

```python { .api }

389

class OffsetMetadataTooLargeError(BrokerResponseError):

390

"""

391

Offset metadata too large.

392

393

Attributes:

394

- errno = 12

395

- message = 'OFFSET_METADATA_TOO_LARGE'

396

- description = 'The metadata field of the offset request was too large'

397

"""

398

errno = 12

399

message = 'OFFSET_METADATA_TOO_LARGE'

400

description = 'The metadata field of the offset request was too large'

401

402

class GroupLoadInProgressError(BrokerResponseError):

403

"""

404

Consumer group loading in progress.

405

406

Attributes:

407

- errno = 14

408

- message = 'GROUP_LOAD_IN_PROGRESS'

409

- description = 'The coordinator is loading and hence can\'t process requests'

410

- retriable = True

411

"""

412

errno = 14

413

message = 'GROUP_LOAD_IN_PROGRESS'

414

description = 'The coordinator is loading and hence can\'t process requests'

415

retriable = True

416

417

class GroupCoordinatorNotAvailableError(BrokerResponseError):

418

"""

419

Group coordinator not available.

420

421

Attributes:

422

- errno = 15

423

- message = 'GROUP_COORDINATOR_NOT_AVAILABLE'

424

- description = 'The group coordinator is not available'

425

- retriable = True

426

- invalid_metadata = True

427

"""

428

errno = 15

429

message = 'GROUP_COORDINATOR_NOT_AVAILABLE'

430

description = 'The group coordinator is not available'

431

retriable = True

432

invalid_metadata = True

433

434

class NotCoordinatorForGroupError(BrokerResponseError):

435

"""

436

Broker is not coordinator for group.

437

438

Attributes:

439

- errno = 16

440

- message = 'NOT_COORDINATOR_FOR_GROUP'

441

- description = 'The broker is not the coordinator for this group'

442

- retriable = True

443

- invalid_metadata = True

444

"""

445

errno = 16

446

message = 'NOT_COORDINATOR_FOR_GROUP'

447

description = 'The broker is not the coordinator for this group'

448

retriable = True

449

invalid_metadata = True

450

451

class UnknownMemberIdError(BrokerResponseError):

452

"""

453

Unknown member ID.

454

455

Attributes:

456

- errno = 25

457

- message = 'UNKNOWN_MEMBER_ID'

458

- description = 'The member id is not in the current generation'

459

"""

460

errno = 25

461

message = 'UNKNOWN_MEMBER_ID'

462

description = 'The member id is not in the current generation'

463

464

class IllegalGenerationError(BrokerResponseError):

465

"""

466

Illegal generation ID.

467

468

Attributes:

469

- errno = 22

470

- message = 'ILLEGAL_GENERATION'

471

- description = 'Specified group generation id is not valid'

472

"""

473

errno = 22

474

message = 'ILLEGAL_GENERATION'

475

description = 'Specified group generation id is not valid'

476

477

class RebalanceInProgressError(BrokerResponseError):

478

"""

479

Consumer group rebalance in progress.

480

481

Attributes:

482

- errno = 27

483

- message = 'REBALANCE_IN_PROGRESS'

484

- description = 'The group is rebalancing, so a rejoin is needed'

485

- retriable = True

486

"""

487

errno = 27

488

message = 'REBALANCE_IN_PROGRESS'

489

description = 'The group is rebalancing, so a rejoin is needed'

490

retriable = True

491

```

492

493

### Producer-Specific Errors

494

495

Errors specific to producer operations and message publishing.

496

497

```python { .api }

498

class InvalidRequiredAcksError(BrokerResponseError):

499

"""

500

Invalid required acknowledgments.

501

502

Attributes:

503

- errno = 21

504

- message = 'INVALID_REQUIRED_ACKS'

505

- description = 'Specified required acks is invalid (must be -1, 0, or 1)'

506

"""

507

errno = 21

508

message = 'INVALID_REQUIRED_ACKS'

509

description = 'Specified required acks is invalid (must be -1, 0, or 1)'

510

511

class RecordListTooLargeError(BrokerResponseError):

512

"""

513

Record batch too large.

514

515

Attributes:

516

- errno = 18

517

- message = 'RECORD_LIST_TOO_LARGE'

518

- description = 'The request included message batch larger than the configured segment size'

519

"""

520

errno = 18

521

message = 'RECORD_LIST_TOO_LARGE'

522

description = 'The request included message batch larger than the configured segment size'

523

524

class InvalidPartitionError(BrokerResponseError):

525

"""

526

Invalid partition number.

527

528

Attributes:

529

- errno = 4

530

- message = 'INVALID_FETCH_SIZE'

531

- description = 'The message has an invalid offset'

532

"""

533

errno = 4

534

message = 'INVALID_FETCH_SIZE'

535

description = 'The message has an invalid offset'

536

537

class DuplicateSequenceNumberError(BrokerResponseError):

538

"""

539

Duplicate sequence number (idempotent producer).

540

541

Attributes:

542

- errno = 45

543

- message = 'DUPLICATE_SEQUENCE_NUMBER'

544

- description = 'A producer attempted to produce with an old sequence number'

545

"""

546

errno = 45

547

message = 'DUPLICATE_SEQUENCE_NUMBER'

548

description = 'A producer attempted to produce with an old sequence number'

549

550

class OutOfOrderSequenceNumberError(BrokerResponseError):

551

"""

552

Out of order sequence number (idempotent producer).

553

554

Attributes:

555

- errno = 46

556

- message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'

557

- description = 'A producer attempted to produce with a sequence number which is not the expected next one'

558

"""

559

errno = 46

560

message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'

561

description = 'A producer attempted to produce with a sequence number which is not the expected next one'

562

```

563

564

## Error Handling Patterns

565

566

### Basic Error Handling

567

568

```python

569

from kafka import KafkaProducer, KafkaConsumer

570

from kafka.errors import (KafkaError, KafkaTimeoutError, KafkaConnectionError,

571

TopicAuthorizationFailedError, MessageSizeTooLargeError)

572

573

# Producer error handling

574

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

575

576

try:

577

future = producer.send('my-topic', value=b'test message')

578

record_metadata = future.get(timeout=30)

579

print(f"Message sent successfully: offset {record_metadata.offset}")

580

581

except TopicAuthorizationFailedError:

582

print("Access denied to topic")

583

except MessageSizeTooLargeError:

584

print("Message too large for broker")

585

except KafkaTimeoutError:

586

print("Request timed out")

587

except KafkaConnectionError:

588

print("Connection failed")

589

except KafkaError as e:

590

print(f"Kafka error: {e}")

591

finally:

592

producer.close()

593

```

594

595

### Retry Logic with Exponential Backoff

596

597

```python

598

import time

599

import random

600

from kafka import KafkaProducer

601

from kafka.errors import KafkaError

602

603

def send_with_retry(producer, topic, value, max_retries=3):

604

"""Send message with exponential backoff retry logic."""

605

606

for attempt in range(max_retries + 1):

607

try:

608

future = producer.send(topic, value=value)

609

return future.get(timeout=30)

610

611

except KafkaError as e:

612

if not e.retriable or attempt == max_retries:

613

# Non-retriable error or max retries reached

614

raise e

615

616

# Calculate exponential backoff with jitter

617

backoff = (2 ** attempt) + random.uniform(0, 1)

618

print(f"Attempt {attempt + 1} failed: {e}. Retrying in {backoff:.2f}s")

619

time.sleep(backoff)

620

621

raise KafkaError("Max retries exceeded")

622

623

# Usage

624

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

625

626

try:

627

metadata = send_with_retry(producer, 'events', b'important message')

628

print(f"Message sent successfully: {metadata}")

629

except KafkaError as e:

630

print(f"Failed to send message: {e}")

631

finally:

632

producer.close()

633

```

634

635

### Consumer Error Handling

636

637

```python

638

from kafka import KafkaConsumer

639

from kafka.errors import (ConsumerTimeoutError, OffsetOutOfRangeError,

640

GroupAuthorizationFailedError, StaleMetadata)

641

642

consumer = KafkaConsumer(

643

'my-topic',

644

bootstrap_servers=['localhost:9092'],

645

group_id='error-handling-group',

646

auto_offset_reset='earliest',

647

consumer_timeout_ms=10000

648

)

649

650

try:

651

for message in consumer:

652

try:

653

# Process message

654

process_message(message)

655

656

except Exception as e:

657

print(f"Error processing message at offset {message.offset}: {e}")

658

# Could implement dead letter queue here

659

continue

660

661

except OffsetOutOfRangeError:

662

print("Offset out of range - seeking to beginning")

663

consumer.seek_to_beginning()

664

665

except GroupAuthorizationFailedError:

666

print("Access denied to consumer group")

667

668

except StaleMetadata:

669

print("Metadata stale - will refresh automatically")

670

671

except ConsumerTimeoutError:

672

print("Consumer timeout - no messages received")

673

674

except KeyboardInterrupt:

675

print("Shutting down consumer")

676

677

finally:

678

consumer.close()

679

```

680

681

### Circuit Breaker Pattern

682

683

```python

684

import time

685

from enum import Enum

686

from kafka import KafkaProducer

687

from kafka.errors import KafkaError

688

689

class CircuitState(Enum):

690

CLOSED = "closed"

691

OPEN = "open"

692

HALF_OPEN = "half_open"

693

694

class CircuitBreaker:

695

def __init__(self, failure_threshold=5, recovery_timeout=60):

696

self.failure_threshold = failure_threshold

697

self.recovery_timeout = recovery_timeout

698

self.failure_count = 0

699

self.last_failure_time = None

700

self.state = CircuitState.CLOSED

701

702

def call(self, func, *args, **kwargs):

703

"""Execute function with circuit breaker protection."""

704

705

if self.state == CircuitState.OPEN:

706

if time.time() - self.last_failure_time > self.recovery_timeout:

707

self.state = CircuitState.HALF_OPEN

708

print("Circuit breaker transitioning to HALF_OPEN")

709

else:

710

raise Exception("Circuit breaker is OPEN")

711

712

try:

713

result = func(*args, **kwargs)

714

self._on_success()

715

return result

716

717

except Exception as e:

718

self._on_failure()

719

raise e

720

721

def _on_success(self):

722

"""Reset circuit breaker on successful call."""

723

self.failure_count = 0

724

if self.state == CircuitState.HALF_OPEN:

725

self.state = CircuitState.CLOSED

726

print("Circuit breaker reset to CLOSED")

727

728

def _on_failure(self):

729

"""Handle failure and potentially open circuit."""

730

self.failure_count += 1

731

self.last_failure_time = time.time()

732

733

if self.failure_count >= self.failure_threshold:

734

self.state = CircuitState.OPEN

735

print(f"Circuit breaker OPENED after {self.failure_count} failures")

736

737

# Usage with Kafka producer

738

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

739

circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

740

741

def send_message(topic, value):

742

"""Send message through circuit breaker."""

743

future = producer.send(topic, value=value)

744

return future.get(timeout=10)

745

746

# Send messages with circuit breaker protection

747

for i in range(20):

748

try:

749

result = circuit_breaker.call(send_message, 'protected-topic', f'Message {i}'.encode())

750

print(f"Message {i} sent successfully")

751

752

except Exception as e:

753

print(f"Message {i} failed: {e}")

754

time.sleep(1) # Brief pause before next attempt

755

756

producer.close()

757

```

758

759

### Error Classification and Handling

760

761

```python

762

from kafka.errors import (KafkaError, BrokerResponseError, AuthorizationError,

763

KafkaTimeoutError, KafkaConnectionError)

764

765

def classify_and_handle_error(error):

766

"""Classify error and determine appropriate handling strategy."""

767

768

if isinstance(error, AuthorizationError):

769

return {

770

'category': 'authorization',

771

'action': 'check_credentials',

772

'retriable': False,

773

'severity': 'high'

774

}

775

776

elif isinstance(error, KafkaConnectionError):

777

return {

778

'category': 'network',

779

'action': 'retry_with_backoff',

780

'retriable': True,

781

'severity': 'medium'

782

}

783

784

elif isinstance(error, KafkaTimeoutError):

785

return {

786

'category': 'timeout',

787

'action': 'retry_with_longer_timeout',

788

'retriable': True,

789

'severity': 'low'

790

}

791

792

elif isinstance(error, BrokerResponseError):

793

if error.retriable:

794

return {

795

'category': 'broker_retriable',

796

'action': 'retry_after_delay',

797

'retriable': True,

798

'severity': 'medium'

799

}

800

else:

801

return {

802

'category': 'broker_fatal',

803

'action': 'fail_fast',

804

'retriable': False,

805

'severity': 'high'

806

}

807

808

elif isinstance(error, KafkaError):

809

return {

810

'category': 'generic_kafka',

811

'action': 'investigate',

812

'retriable': getattr(error, 'retriable', False),

813

'severity': 'medium'

814

}

815

816

else:

817

return {

818

'category': 'unknown',

819

'action': 'investigate',

820

'retriable': False,

821

'severity': 'high'

822

}

823

824

# Usage in error handler

825

def handle_kafka_error(error, operation_context):

826

"""Handle Kafka error based on classification."""

827

828

classification = classify_and_handle_error(error)

829

830

print(f"Error in {operation_context}: {error}")

831

print(f"Classification: {classification}")

832

833

if classification['severity'] == 'high':

834

# Log critical error

835

logger.critical(f"Critical Kafka error: {error}")

836

837

if classification['retriable']:

838

return True # Indicate retry should be attempted

839

else:

840

return False # Indicate failure should be reported

841

```

842

843

### Monitoring and Alerting

844

845

```python

846

import logging

847

from collections import defaultdict, deque

848

import time

849

from kafka.errors import KafkaError

850

851

class KafkaErrorMonitor:

852

"""Monitor and track Kafka errors for alerting."""

853

854

def __init__(self, window_size=300): # 5 minute window

855

self.window_size = window_size

856

self.error_counts = defaultdict(lambda: deque())

857

self.logger = logging.getLogger(__name__)

858

859

def record_error(self, error, context="unknown"):

860

"""Record error occurrence."""

861

862

error_type = type(error).__name__

863

timestamp = time.time()

864

865

# Add to sliding window

866

self.error_counts[error_type].append(timestamp)

867

868

# Remove old entries outside window

869

cutoff = timestamp - self.window_size

870

while (self.error_counts[error_type] and

871

self.error_counts[error_type][0] < cutoff):

872

self.error_counts[error_type].popleft()

873

874

# Log error

875

self.logger.error(f"Kafka error in {context}: {error}")

876

877

# Check for alert conditions

878

self._check_alert_conditions(error_type)

879

880

def _check_alert_conditions(self, error_type):

881

"""Check if error rates exceed alert thresholds."""

882

883

error_count = len(self.error_counts[error_type])

884

885

# Alert if too many errors in window

886

if error_count > 10:

887

self.logger.critical(

888

f"High error rate: {error_count} {error_type} errors "

889

f"in last {self.window_size} seconds"

890

)

891

892

# Alert for specific critical errors

893

if error_type in ['TopicAuthorizationFailedError', 'ClusterAuthorizationFailedError']:

894

self.logger.critical(f"Authorization failure: {error_type}")

895

896

def get_error_summary(self):

897

"""Get summary of recent errors."""

898

899

summary = {}

900

current_time = time.time()

901

cutoff = current_time - self.window_size

902

903

for error_type, timestamps in self.error_counts.items():

904

# Count recent errors

905

recent_count = sum(1 for ts in timestamps if ts > cutoff)

906

if recent_count > 0:

907

summary[error_type] = recent_count

908

909

return summary

910

911

# Usage

912

error_monitor = KafkaErrorMonitor()

913

914

def monitored_kafka_operation(operation_func, *args, **kwargs):

915

"""Execute Kafka operation with error monitoring."""

916

917

try:

918

return operation_func(*args, **kwargs)

919

920

except KafkaError as e:

921

error_monitor.record_error(e, context=operation_func.__name__)

922

raise e

923

924

# Example usage

925

try:

926

monitored_kafka_operation(producer.send, 'topic', b'message')

927

except KafkaError:

928

# Error already logged and monitored

929

pass

930

931

# Periodic error summary

932

print("Recent errors:", error_monitor.get_error_summary())

933

```