or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kafka-python@2.2.x

To install, run

npx @tessl/cli install tessl/pypi-kafka-python@2.2.0

0

# kafka-python

1

2

A pure Python client library for Apache Kafka distributed stream processing system. Provides comprehensive support for producers, consumers, and administrative operations with compatibility across Kafka broker versions 0.8.0 to 2.6+.

3

4

## Version Compatibility

5

6

- **Core Features**: Compatible with Kafka 0.8.0+

7

- **Consumer Groups**: Requires Kafka 0.9.0+ for coordinated consumer groups

8

- **Idempotent Producer**: Requires Kafka 0.11.0+

9

- **Transactional Producer**: Requires Kafka 0.11.0+

10

- **Admin API**: Requires Kafka 0.10.1.0+

11

- **Headers Support**: Requires Kafka 0.11.0+

12

- **Exactly-Once Semantics**: Requires Kafka 0.11.0+

13

14

## Package Information

15

16

- **Package Name**: kafka-python

17

- **Language**: Python

18

- **Installation**: `pip install kafka-python`

19

- **Optional Dependencies**:

20

- `pip install kafka-python[crc32c]` - Faster CRC32C validation

21

- `pip install kafka-python[lz4]` - LZ4 compression support

22

- `pip install kafka-python[snappy]` - Snappy compression support

23

- `pip install kafka-python[zstd]` - Zstandard compression support

24

25

## Core Imports

26

27

Basic imports for common usage:

28

29

```python

30

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient

31

```

32

33

Topic and partition management:

34

35

```python

36

from kafka import TopicPartition, OffsetAndMetadata

37

```

38

39

Consumer coordination:

40

41

```python

42

from kafka.consumer import ConsumerRebalanceListener

43

```

44

45

Administrative objects:

46

47

```python

48

from kafka.admin import NewTopic, NewPartitions, ConfigResource, ACL

49

```

50

51

Low-level client and connection:

52

53

```python

54

from kafka import KafkaClient, BrokerConnection

55

```

56

57

Serialization interfaces:

58

59

```python

60

from kafka import Serializer, Deserializer

61

```

62

63

## Basic Usage

64

65

### Simple Producer

66

67

```python

68

from kafka import KafkaProducer

69

import json

70

71

# Configure producer

72

producer = KafkaProducer(

73

bootstrap_servers=['localhost:9092'],

74

value_serializer=lambda v: json.dumps(v).encode('utf-8'),

75

key_serializer=lambda k: k.encode('utf-8') if k else None

76

)

77

78

# Send message

79

future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')

80

81

# Block for acknowledgment

82

record_metadata = future.get(timeout=10)

83

print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")

84

85

# Clean shutdown

86

producer.close()

87

```

88

89

### Simple Consumer

90

91

```python

92

from kafka import KafkaConsumer

93

import json

94

95

# Configure consumer

96

consumer = KafkaConsumer(

97

'my-topic',

98

bootstrap_servers=['localhost:9092'],

99

group_id='my-consumer-group',

100

value_deserializer=lambda m: json.loads(m.decode('utf-8')),

101

key_deserializer=lambda k: k.decode('utf-8') if k else None,

102

auto_offset_reset='earliest'

103

)

104

105

# Consume messages

106

for message in consumer:

107

print(f"Received: key={message.key}, value={message.value}")

108

print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")

109

110

# Clean shutdown

111

consumer.close()

112

```

113

114

### Administrative Operations

115

116

```python

117

from kafka import KafkaAdminClient

118

from kafka.admin import NewTopic, ConfigResource, ConfigResourceType

119

120

# Configure admin client

121

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

122

123

# Create topic

124

topic = NewTopic(name='new-topic', num_partitions=3, replication_factor=1)

125

admin.create_topics([topic])

126

127

# List topics

128

topics = admin.list_topics()

129

print(f"Available topics: {list(topics)}")

130

131

# Clean shutdown

132

admin.close()

133

```

134

135

## Architecture

136

137

kafka-python follows a layered architecture designed for both simplicity and extensibility:

138

139

- **High-Level APIs**: `KafkaProducer`, `KafkaConsumer`, `KafkaAdminClient` provide simple interfaces for common operations

140

- **Low-Level Client**: `KafkaClient` handles protocol communication and connection management

141

- **Connection Layer**: `BrokerConnection` manages individual TCP connections with proper error handling and reconnection

142

- **Protocol Layer**: Complete implementation of Kafka wire protocol with support for all API versions

143

- **Pluggable Components**: Serializers, partitioners, and metrics collectors can be customized or replaced

144

145

This design enables everything from simple fire-and-forget message sending to complex transactional processing and cluster administration.

146

147

## Capabilities

148

149

### Message Production

150

151

High-level producer for publishing records to Kafka topics with support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.

152

153

```python { .api }

154

class KafkaProducer:

155

def __init__(self, **configs): ...

156

def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None): ...

157

def flush(self, timeout=None): ...

158

def close(self, timeout=None): ...

159

```

160

161

[Producer API](./producer.md)

162

163

### Message Consumption

164

165

High-level consumer for consuming records from Kafka topics with support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.

166

167

```python { .api }

168

class KafkaConsumer:

169

def __init__(self, *topics, **configs): ...

170

def subscribe(self, topics=(), pattern=None, listener=None): ...

171

def assign(self, partitions): ...

172

def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...

173

def commit(self, offsets=None): ...

174

def seek(self, partition, offset): ...

175

```

176

177

```python { .api }

178

class ConsumerRebalanceListener:

179

def on_partitions_revoked(self, revoked): ...

180

def on_partitions_assigned(self, assigned): ...

181

```

182

183

[Consumer API](./consumer.md)

184

185

### Cluster Administration

186

187

Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.

188

189

```python { .api }

190

class KafkaAdminClient:

191

def __init__(self, **configs): ...

192

def create_topics(self, topic_requests, timeout_ms=None, validate_only=False): ...

193

def delete_topics(self, topics, timeout_ms=None): ...

194

def list_topics(self, timeout_ms=None): ...

195

def describe_topics(self, topics, timeout_ms=None): ...

196

def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False): ...

197

def describe_configs(self, config_resources, timeout_ms=None): ...

198

def alter_configs(self, config_resources, timeout_ms=None): ...

199

```

200

201

[Administrative API](./admin.md)

202

203

### Data Structures and Metadata

204

205

Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.

206

207

```python { .api }

208

TopicPartition = NamedTuple('TopicPartition', [('topic', str), ('partition', int)])

209

OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [('offset', int), ('metadata', str), ('leader_epoch', int)])

210

BrokerMetadata = NamedTuple('BrokerMetadata', [('nodeId', int), ('host', str), ('port', int), ('rack', str)])

211

```

212

213

[Data Structures](./structures.md)

214

215

### Error Handling and Exceptions

216

217

Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.

218

219

```python { .api }

220

class KafkaError(Exception):

221

retriable: bool

222

invalid_metadata: bool

223

224

class KafkaTimeoutError(KafkaError): ...

225

class KafkaConnectionError(KafkaError): ...

226

class TopicAuthorizationFailedError(AuthorizationError): ...

227

```

228

229

[Error Handling](./errors.md)

230

231

### Low-Level Client and Connection Management

232

233

Low-level client for direct protocol communication and connection management, providing fine-grained control over cluster interactions and metadata handling.

234

235

```python { .api }

236

class KafkaClient:

237

def __init__(self, **configs): ...

238

def bootstrap_connected(self): ...

239

def check_version(self, node_id=None, timeout=2, strict=False): ...

240

def cluster(self): ...

241

242

class BrokerConnection:

243

def __init__(self, host, port, **configs): ...

244

def connect(self): ...

245

def connected(self): ...

246

def close(self): ...

247

```

248

249

### Serialization Framework

250

251

Abstract base classes for implementing custom serializers and deserializers with pluggable serialization strategies.

252

253

```python { .api }

254

class Serializer:

255

def serialize(self, topic, value): ...

256

def close(self): ...

257

258

class Deserializer:

259

def deserialize(self, topic, bytes_): ...

260

def close(self): ...

261

```

262

263

## Authentication and Security

264

265

kafka-python supports multiple authentication mechanisms:

266

267

- **SASL/PLAIN**: Simple username/password authentication

268

- **SASL/SCRAM**: Challenge-response authentication with SHA-256/SHA-512

269

- **SASL/GSSAPI**: Kerberos authentication (Unix systems)

270

- **SASL/OAUTHBEARER**: OAuth 2.0 Bearer token authentication

271

- **AWS MSK IAM**: AWS-specific IAM authentication for Amazon MSK

272

- **SSL/TLS**: Encrypted connections with certificate validation

273

274

## Compression and Serialization

275

276

Built-in support for multiple compression algorithms:

277

278

- **gzip**: Standard compression with configurable levels

279

- **snappy**: Fast compression optimized for speed

280

- **lz4**: High-speed compression with Kafka-specific framing

281

- **zstd**: Modern compression with excellent compression ratios

282

283

Extensible serialization framework with abstract base classes for custom serializers and deserializers.

284

285

## Types

286

287

### Core Configuration Types

288

289

```python { .api }

290

# Producer configuration dictionary with string keys and various value types

291

ProducerConfig = Dict[str, Any]

292

293

# Consumer configuration dictionary with string keys and various value types

294

ConsumerConfig = Dict[str, Any]

295

296

# Admin client configuration dictionary with string keys and various value types

297

AdminConfig = Dict[str, Any]

298

```

299

300

### Message Types

301

302

```python { .api }

303

# Producer message future returned by send()

304

class FutureRecordMetadata:

305

def get(self, timeout=None): ...

306

def add_callback(self, callback): ...

307

def add_errback(self, errback): ...

308

309

# Consumer record received from poll()

310

class ConsumerRecord:

311

topic: str

312

partition: int

313

offset: int

314

timestamp: int

315

timestamp_type: int

316

key: bytes

317

value: bytes

318

headers: List[Tuple[str, bytes]]

319

```