or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kafka-python-ng@2.2.x

To install, run

npx @tessl/cli install tessl/pypi-kafka-python-ng@2.2.0

0

# Kafka Python NG

1

2

A pure Python client library for Apache Kafka that provides high-level producer and consumer APIs, admin functionality, and full protocol support. Designed for compatibility with Kafka brokers from version 0.8.0 to 2.6+, offering Pythonic interfaces for distributed stream processing applications.

3

4

## Package Information

5

6

- **Package Name**: kafka-python-ng

7

- **Language**: Python

8

- **Installation**: `pip install kafka-python-ng`

9

- **Requirements**: Python >= 3.8

10

11

## Core Imports

12

13

```python

14

import kafka

15

```

16

17

For specific functionality:

18

19

```python

20

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient

21

```

22

23

For data structures and error handling:

24

25

```python

26

from kafka.structs import TopicPartition, OffsetAndMetadata

27

from kafka.errors import KafkaError

28

from kafka.consumer.subscription_state import ConsumerRebalanceListener

29

from kafka.conn import BrokerConnection

30

from kafka.serializer import Serializer, Deserializer

31

```

32

33

For admin operations:

34

35

```python

36

from kafka.admin import NewTopic, NewPartitions, ConfigResource, ConfigResourceType

37

```

38

39

## Basic Usage

40

41

### Producer

42

43

```python

44

from kafka import KafkaProducer

45

import json

46

47

# Create producer with JSON serialization

48

producer = KafkaProducer(

49

bootstrap_servers=['localhost:9092'],

50

value_serializer=lambda v: json.dumps(v).encode('utf-8')

51

)

52

53

# Send messages

54

producer.send('my-topic', {'key': 'value'})

55

producer.flush()

56

producer.close()

57

```

58

59

### Consumer

60

61

```python

62

from kafka import KafkaConsumer

63

import json

64

65

# Create consumer

66

consumer = KafkaConsumer(

67

'my-topic',

68

bootstrap_servers=['localhost:9092'],

69

auto_offset_reset='earliest',

70

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

71

)

72

73

# Consume messages

74

for message in consumer:

75

print(f"Topic: {message.topic}, Value: {message.value}")

76

```

77

78

### Admin Client

79

80

```python

81

from kafka import KafkaAdminClient

82

from kafka.admin import NewTopic

83

84

# Create admin client

85

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

86

87

# Create topics

88

topic = NewTopic(name='test-topic', num_partitions=3, replication_factor=1)

89

admin.create_topics([topic])

90

```

91

92

## Architecture

93

94

The kafka-python-ng library is organized around three main client types:

95

96

- **KafkaProducer**: Thread-safe producer for publishing records with batching, compression, and retry logic

97

- **KafkaConsumer**: High-level consumer with automatic group coordination, partition assignment, and offset management

98

- **KafkaAdminClient**: Administrative operations for topics, configs, consumer groups, and ACLs

99

100

The library uses an async I/O foundation with selector-based networking, automatic metadata management, and comprehensive error handling. It supports all major Kafka features including SASL authentication, SSL encryption, multiple compression formats, and transactional semantics.

101

102

## Capabilities

103

104

### Producer API

105

106

High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. Supports asynchronous sending with futures and callback handling.

107

108

```python { .api }

109

class KafkaProducer:

110

def __init__(self, **configs): ...

111

def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None): ...

112

def flush(self, timeout=None): ...

113

def close(self, timeout=None): ...

114

```

115

116

[Producer Operations](./producer.md)

117

118

### Consumer API

119

120

High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based and manual partition assignment.

121

122

```python { .api }

123

class KafkaConsumer:

124

def __init__(self, *topics, **configs): ...

125

def subscribe(self, topics, pattern=None, listener=None): ...

126

def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...

127

def commit(self, offsets=None): ...

128

def close(self): ...

129

```

130

131

[Consumer Operations](./consumer.md)

132

133

### Admin API

134

135

Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs).

136

137

```python { .api }

138

class KafkaAdminClient:

139

def __init__(self, **configs): ...

140

def create_topics(self, topic_list, timeout_ms=None, validate_only=False): ...

141

def delete_topics(self, topics, timeout_ms=None): ...

142

def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False): ...

143

```

144

145

[Administrative Operations](./admin.md)

146

147

### Configuration and Connection Management

148

149

Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.

150

151

```python { .api }

152

class BrokerConnection:

153

def __init__(self, host, port, **configs): ...

154

def connect(self, timeout=None): ...

155

def close(self): ...

156

```

157

158

[Connection and Configuration](./connection.md)

159

160

### Data Structures and Types

161

162

Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.

163

164

```python { .api }

165

class TopicPartition:

166

def __init__(self, topic: str, partition: int): ...

167

168

class OffsetAndMetadata:

169

def __init__(self, offset: int, metadata: str): ...

170

```

171

172

[Data Structures](./structs.md)

173

174

### Error Handling

175

176

Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, and authentication failures.

177

178

```python { .api }

179

class KafkaError(RuntimeError):

180

retriable: bool

181

invalid_metadata: bool

182

183

class NoBrokersAvailable(KafkaError): ...

184

class CommitFailedError(KafkaError): ...

185

```

186

187

[Error Handling](./errors.md)

188

189

### Serialization

190

191

Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes.

192

193

```python { .api }

194

class Serializer:

195

def serialize(self, topic: str, value) -> bytes: ...

196

def close(self): ...

197

198

class Deserializer:

199

def deserialize(self, topic: str, bytes_: bytes): ...

200

def close(self): ...

201

```

202

203

[Serialization](./serialization.md)

204

205

## Types

206

207

```python { .api }

208

# Core data structures

209

TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])

210

OffsetAndMetadata = namedtuple('OffsetAndMetadata', ['offset', 'metadata'])

211

BrokerMetadata = namedtuple('BrokerMetadata', ['nodeId', 'host', 'port', 'rack'])

212

PartitionMetadata = namedtuple('PartitionMetadata', ['topic', 'partition', 'leader', 'replicas', 'isr', 'error'])

213

OffsetAndTimestamp = namedtuple('OffsetAndTimestamp', ['offset', 'timestamp'])

214

215

# Admin types

216

class NewTopic:

217

def __init__(self, name: str, num_partitions: int, replication_factor: int, **configs): ...

218

219

class NewPartitions:

220

def __init__(self, total_count: int, new_assignments=None): ...

221

222

class ConfigResource:

223

def __init__(self, resource_type, name: str): ...

224

225

# Consumer callback interface

226

class ConsumerRebalanceListener:

227

def on_partitions_revoked(self, revoked): ...

228

def on_partitions_assigned(self, assigned): ...

229

230

# Connection management

231

class BrokerConnection:

232

def __init__(self, host: str, port: int, **configs): ...

233

def connect(self, timeout=None): ...

234

def close(self): ...

235

236

# Serialization interfaces

237

class Serializer:

238

def serialize(self, topic: str, value) -> bytes: ...

239

def close(self): ...

240

241

class Deserializer:

242

def deserialize(self, topic: str, bytes_: bytes): ...

243

def close(self): ...

244

245

# Future type for async operations

246

class FutureRecordMetadata:

247

def get(self, timeout=None): ...

248

def is_done(self) -> bool: ...

249

def add_callback(self, callback): ...

250

def add_errback(self, errback): ...

251

```