or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md

errors.mddocs/

0

# Error Handling

1

2

Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, authentication failures, and consumer group coordination problems.

3

4

## Capabilities

5

6

### Base Exception Classes

7

8

Foundation exception classes that define the error handling framework.

9

10

```python { .api }

11

class KafkaError(RuntimeError):

12

"""

13

Base exception for all Kafka-related errors.

14

15

Attributes:

16

retriable (bool): Whether the operation that caused this error can be retried

17

invalid_metadata (bool): Whether this error indicates stale metadata that should be refreshed

18

"""

19

retriable: bool = False

20

invalid_metadata: bool = False

21

22

def __str__(self):

23

"""String representation of the error."""

24

```

25

26

### Network and Connection Errors

27

28

Errors related to network connectivity and broker communication.

29

30

```python { .api }

31

class NoBrokersAvailable(KafkaError):

32

"""No Kafka brokers are available for connection."""

33

retriable: bool = True

34

invalid_metadata: bool = True

35

36

class NodeNotReadyError(KafkaError):

37

"""Broker node is not ready to accept requests."""

38

retriable: bool = True

39

40

class ConnectionError(KafkaError):

41

"""Failed to establish connection to Kafka broker."""

42

retriable: bool = True

43

44

class RequestTimedOutError(KafkaError):

45

"""Request to Kafka broker timed out."""

46

retriable: bool = True

47

48

class NetworkException(KafkaError):

49

"""Network-level communication error."""

50

retriable: bool = True

51

```

52

53

### Protocol and Message Errors

54

55

Errors related to Kafka protocol handling and message processing.

56

57

```python { .api }

58

class KafkaProtocolError(KafkaError):

59

"""Kafka protocol-level error."""

60

retriable: bool = True

61

62

class CorrelationIdError(KafkaProtocolError):

63

"""Response correlation ID does not match request."""

64

retriable: bool = True

65

66

class BufferUnderflowError(KafkaError):

67

"""Insufficient data in buffer for deserialization."""

68

69

class ChecksumError(KafkaError):

70

"""Message checksum validation failed."""

71

72

class CompressionNotSupportedError(KafkaError):

73

"""Requested compression type is not supported."""

74

75

class UnsupportedVersionError(KafkaError):

76

"""API version is not supported by broker."""

77

```

78

79

### Client State and Usage Errors

80

81

Errors related to incorrect client usage or invalid state.

82

83

```python { .api }

84

class IllegalStateError(KafkaError):

85

"""Client is in an invalid state for the requested operation."""

86

87

class IllegalArgumentError(KafkaError):

88

"""Invalid argument provided to client method."""

89

90

class InvalidTopicError(KafkaError):

91

"""Topic name is invalid or does not exist."""

92

93

class TopicAlreadyExistsError(KafkaError):

94

"""Attempted to create a topic that already exists."""

95

96

class InvalidPartitionsError(KafkaError):

97

"""Invalid partition configuration."""

98

99

class InvalidReplicationFactorError(KafkaError):

100

"""Invalid replication factor specified."""

101

```

102

103

### Producer Errors

104

105

Errors specific to producer operations.

106

107

```python { .api }

108

class TooManyInFlightRequests(KafkaError):

109

"""Too many unacknowledged requests are pending."""

110

retriable: bool = True

111

112

class MessageSizeTooLargeError(KafkaError):

113

"""Message size exceeds broker limits."""

114

115

class RecordBatchTooLargeError(KafkaError):

116

"""Record batch size exceeds broker limits."""

117

118

class InvalidRecordError(KafkaError):

119

"""Record contains invalid data."""

120

121

class RecordTooLargeError(KafkaError):

122

"""Individual record exceeds size limits."""

123

124

class UnknownTopicOrPartitionError(KafkaError):

125

"""Topic or partition does not exist."""

126

invalid_metadata: bool = True

127

128

class LeaderNotAvailableError(KafkaError):

129

"""Partition leader is not available."""

130

retriable: bool = True

131

invalid_metadata: bool = True

132

133

class NotLeaderForPartitionError(KafkaError):

134

"""Broker is not the leader for the partition."""

135

retriable: bool = True

136

invalid_metadata: bool = True

137

```

138

139

### Consumer Errors

140

141

Errors specific to consumer operations and group coordination.

142

143

```python { .api }

144

class CommitFailedError(KafkaError):

145

"""Offset commit failed due to group rebalance or other issues."""

146

147

class InvalidSessionTimeoutError(KafkaError):

148

"""Session timeout is outside broker's allowed range."""

149

150

class InvalidGroupIdError(KafkaError):

151

"""Consumer group ID is invalid."""

152

153

class GroupLoadInProgressError(KafkaError):

154

"""Consumer group is still loading."""

155

retriable: bool = True

156

157

class GroupCoordinatorNotAvailableError(KafkaError):

158

"""Group coordinator is not available."""

159

retriable: bool = True

160

161

class NotCoordinatorForGroupError(KafkaError):

162

"""Broker is not the coordinator for this group."""

163

retriable: bool = True

164

165

class UnknownMemberIdError(KafkaError):

166

"""Consumer group member ID is not recognized."""

167

168

class IllegalGenerationError(KafkaError):

169

"""Consumer group generation ID is invalid."""

170

171

class OffsetOutOfRangeError(KafkaError):

172

"""Requested offset is outside the available range."""

173

174

class GroupAuthorizationFailedError(KafkaError):

175

"""Not authorized to access consumer group."""

176

177

class TopicAuthorizationFailedError(KafkaError):

178

"""Not authorized to access topic."""

179

```

180

181

### Authentication and Authorization Errors

182

183

Errors related to security and access control.

184

185

```python { .api }

186

class AuthenticationFailedError(KafkaError):

187

"""SASL authentication failed."""

188

189

class AuthenticationMethodNotSupported(KafkaError):

190

"""Requested SASL mechanism is not supported."""

191

192

class SaslAuthenticationError(KafkaError):

193

"""SASL authentication error."""

194

195

class ClusterAuthorizationFailedError(KafkaError):

196

"""Not authorized to perform cluster operations."""

197

198

class DelegationTokenNotFoundError(KafkaError):

199

"""Delegation token not found."""

200

201

class DelegationTokenAuthorizationFailedError(KafkaError):

202

"""Not authorized to use delegation token."""

203

```

204

205

### Metadata and Coordination Errors

206

207

Errors related to metadata management and cluster coordination.

208

209

```python { .api }

210

class StaleMetadata(KafkaError):

211

"""Client metadata is stale and needs refresh."""

212

retriable: bool = True

213

invalid_metadata: bool = True

214

215

class MetadataEmptyBrokerList(KafkaError):

216

"""Broker list in metadata is empty."""

217

retriable: bool = True

218

219

class UnrecognizedBrokerVersion(KafkaError):

220

"""Broker version is not recognized."""

221

222

class IncompatibleBrokerVersion(KafkaError):

223

"""Broker version is incompatible with client."""

224

225

class Cancelled(KafkaError):

226

"""Operation was cancelled."""

227

retriable: bool = True

228

```

229

230

### Protocol-Specific Errors

231

232

Errors returned by specific Kafka protocol APIs.

233

234

```python { .api }

235

class BrokerResponseError(KafkaError):

236

"""

237

Base class for errors returned by Kafka brokers.

238

239

Attributes:

240

errno (int): Kafka error code

241

message (str): Error message

242

description (str): Error description

243

"""

244

def __init__(self, errno, message=None, description=None):

245

self.errno = errno

246

self.message = message

247

self.description = description

248

249

errno: int

250

message: str

251

description: str

252

253

# Specific broker errors (partial list)

254

class UnknownError(BrokerResponseError):

255

"""Unknown server error."""

256

errno = -1

257

258

class OffsetMetadataTooLarge(BrokerResponseError):

259

"""Offset metadata string is too large."""

260

errno = 12

261

262

class InvalidTopicException(BrokerResponseError):

263

"""Topic name is invalid."""

264

errno = 17

265

266

class RecordListTooLarge(BrokerResponseError):

267

"""Record list is too large."""

268

errno = 18

269

270

class NotEnoughReplicas(BrokerResponseError):

271

"""Not enough replicas available."""

272

errno = 19

273

274

class NotEnoughReplicasAfterAppend(BrokerResponseError):

275

"""Not enough replicas after append."""

276

errno = 20

277

```

278

279

## Usage Examples

280

281

### Basic Error Handling

282

283

```python

284

from kafka import KafkaProducer, KafkaConsumer

285

from kafka.errors import KafkaError, NoBrokersAvailable, RequestTimedOutError

286

287

try:

288

producer = KafkaProducer(bootstrap_servers=['invalid-broker:9092'])

289

producer.send('my-topic', b'test message').get(timeout=10)

290

291

except NoBrokersAvailable:

292

print("No Kafka brokers are available")

293

294

except RequestTimedOutError:

295

print("Request timed out")

296

297

except KafkaError as e:

298

print(f"Kafka error: {e}")

299

300

finally:

301

if 'producer' in locals():

302

producer.close()

303

```

304

305

### Producer Error Handling with Retries

306

307

```python

308

from kafka import KafkaProducer

309

from kafka.errors import (KafkaError, MessageSizeTooLargeError,

310

NotLeaderForPartitionError, RequestTimedOutError)

311

import time

312

313

producer = KafkaProducer(

314

bootstrap_servers=['localhost:9092'],

315

retries=5, # Built-in retries for retriable errors

316

retry_backoff_ms=1000

317

)

318

319

def send_with_retry(topic, message, max_retries=3):

320

for attempt in range(max_retries):

321

try:

322

future = producer.send(topic, message)

323

metadata = future.get(timeout=10)

324

print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")

325

return metadata

326

327

except NotLeaderForPartitionError:

328

print(f"Leader not available, attempt {attempt + 1}")

329

if attempt < max_retries - 1:

330

time.sleep(2 ** attempt) # Exponential backoff

331

continue

332

333

except MessageSizeTooLargeError:

334

print("Message is too large, cannot retry")

335

raise

336

337

except RequestTimedOutError:

338

print(f"Request timed out, attempt {attempt + 1}")

339

if attempt < max_retries - 1:

340

time.sleep(1)

341

continue

342

343

raise KafkaError("Failed to send after all retries")

344

345

# Use the retry function

346

try:

347

send_with_retry('my-topic', b'important message')

348

except KafkaError as e:

349

print(f"Final error: {e}")

350

```

351

352

### Consumer Error Handling

353

354

```python

355

from kafka import KafkaConsumer

356

from kafka.errors import (CommitFailedError, OffsetOutOfRangeError,

357

GroupCoordinatorNotAvailableError, KafkaError)

358

359

consumer = KafkaConsumer(

360

'my-topic',

361

bootstrap_servers=['localhost:9092'],

362

group_id='my-group',

363

enable_auto_commit=False

364

)

365

366

try:

367

for message in consumer:

368

try:

369

# Process message

370

process_message(message)

371

372

# Manual commit

373

consumer.commit()

374

375

except CommitFailedError:

376

print("Commit failed, likely due to rebalance")

377

# Consumer will rejoin group and get new assignment

378

379

except OffsetOutOfRangeError as e:

380

print(f"Offset out of range: {e}")

381

# Seek to beginning or end

382

consumer.seek_to_beginning()

383

384

except Exception as e:

385

print(f"Processing error: {e}")

386

# Continue with next message

387

388

except GroupCoordinatorNotAvailableError:

389

print("Group coordinator not available")

390

391

except KafkaError as e:

392

print(f"Kafka error: {e}")

393

394

finally:

395

consumer.close()

396

397

def process_message(message):

398

# Simulate message processing

399

print(f"Processing: {message.value}")

400

```

401

402

### Authentication Error Handling

403

404

```python

405

from kafka import KafkaProducer

406

from kafka.errors import (AuthenticationFailedError,

407

AuthenticationMethodNotSupported,

408

ClusterAuthorizationFailedError)

409

410

try:

411

producer = KafkaProducer(

412

bootstrap_servers=['secure-broker:9093'],

413

security_protocol='SASL_SSL',

414

sasl_mechanism='SCRAM-SHA-256',

415

sasl_plain_username='wrong-user',

416

sasl_plain_password='wrong-password'

417

)

418

419

producer.send('secure-topic', b'message')

420

421

except AuthenticationFailedError:

422

print("SASL authentication failed - check credentials")

423

424

except AuthenticationMethodNotSupported:

425

print("SASL mechanism not supported by broker")

426

427

except ClusterAuthorizationFailedError:

428

print("Not authorized to access cluster")

429

430

except Exception as e:

431

print(f"Other error: {e}")

432

```

433

434

### Admin Client Error Handling

435

436

```python

437

from kafka import KafkaAdminClient

438

from kafka.admin import NewTopic

439

from kafka.errors import (TopicAlreadyExistsError, InvalidReplicationFactorError,

440

ClusterAuthorizationFailedError, KafkaError)

441

442

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

443

444

try:

445

topics = [NewTopic('test-topic', 3, 2)]

446

result = admin.create_topics(topics)

447

448

# Check individual topic results

449

for topic_name, error in result.items():

450

if error is None:

451

print(f"Topic {topic_name} created successfully")

452

else:

453

print(f"Failed to create topic {topic_name}: {error}")

454

455

except TopicAlreadyExistsError:

456

print("Topic already exists")

457

458

except InvalidReplicationFactorError:

459

print("Invalid replication factor")

460

461

except ClusterAuthorizationFailedError:

462

print("Not authorized to create topics")

463

464

except KafkaError as e:

465

print(f"Admin operation failed: {e}")

466

467

finally:

468

admin.close()

469

```

470

471

### Error Classification and Recovery

472

473

```python

474

from kafka.errors import KafkaError

475

476

def handle_kafka_error(error):

477

"""Handle Kafka errors with appropriate recovery strategies."""

478

479

if hasattr(error, 'retriable') and error.retriable:

480

print(f"Retriable error: {error}")

481

return 'retry'

482

483

if hasattr(error, 'invalid_metadata') and error.invalid_metadata:

484

print(f"Metadata refresh needed: {error}")

485

return 'refresh_metadata'

486

487

# Non-retriable errors

488

print(f"Non-retriable error: {error}")

489

return 'fail'

490

491

# Example usage

492

try:

493

# Kafka operation

494

pass

495

except KafkaError as e:

496

strategy = handle_kafka_error(e)

497

498

if strategy == 'retry':

499

# Implement retry logic

500

pass

501

elif strategy == 'refresh_metadata':

502

# Force metadata refresh

503

pass

504

else:

505

# Log error and exit

506

raise

507

```