or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/confluent-kafka@2.11.x

To install, run

npx @tessl/cli install tessl/pypi-confluent-kafka@2.11.0

0

# Confluent Kafka Python Client

1

2

Confluent's Python client for Apache Kafka, providing high-performance, feature-rich access to Kafka clusters. Built on librdkafka, it offers both low-level producer/consumer APIs and high-level serialization-aware APIs with Schema Registry integration for Avro, JSON Schema, and Protobuf.

3

4

## Package Information

5

6

- **Package Name**: confluent-kafka

7

- **Language**: Python

8

- **Installation**: `pip install confluent-kafka`

9

- **Version**: 2.11.1

10

11

## Core Imports

12

13

```python

14

from confluent_kafka import (

15

Producer, Consumer, Message, TopicPartition, Uuid,

16

Node, ConsumerGroupTopicPartitions, ConsumerGroupState, ConsumerGroupType,

17

TopicCollection, TopicPartitionInfo, IsolationLevel,

18

KafkaException, KafkaError, libversion, version,

19

TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME,

20

OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID

21

)

22

23

# Additional classes (not in __all__ but available)

24

from confluent_kafka import ThrottleEvent, ElectionType

25

```

26

27

For admin operations:

28

29

```python

30

from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource

31

```

32

33

For high-level serialization-aware APIs:

34

35

```python

36

from confluent_kafka import SerializingProducer, DeserializingConsumer

37

from confluent_kafka.serialization import StringSerializer, StringDeserializer

38

```

39

40

For Schema Registry integration:

41

42

```python

43

from confluent_kafka.schema_registry import SchemaRegistryClient

44

from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

45

```

46

47

## Basic Usage

48

49

```python

50

from confluent_kafka import Producer, Consumer, KafkaError

51

52

# Basic Producer

53

producer = Producer({'bootstrap.servers': 'localhost:9092'})

54

55

def delivery_report(err, msg):

56

if err is not None:

57

print(f'Message delivery failed: {err}')

58

else:

59

print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

60

61

# Produce a message

62

producer.produce('my-topic', key='key1', value='Hello World', callback=delivery_report)

63

producer.flush()

64

65

# Basic Consumer

66

consumer = Consumer({

67

'bootstrap.servers': 'localhost:9092',

68

'group.id': 'my-group',

69

'auto.offset.reset': 'earliest'

70

})

71

72

consumer.subscribe(['my-topic'])

73

74

try:

75

while True:

76

msg = consumer.poll(1.0)

77

if msg is None:

78

continue

79

if msg.error():

80

if msg.error().code() == KafkaError._PARTITION_EOF:

81

print(f'End of partition reached {msg.topic()} [{msg.partition()}]')

82

else:

83

print(f'Error: {msg.error()}')

84

else:

85

print(f'Received message: key={msg.key()}, value={msg.value()}')

86

finally:

87

consumer.close()

88

```

89

90

## Architecture

91

92

The confluent-kafka library is built around a layered architecture:

93

94

- **C Extension Core (`cimpl`)**: Low-level librdkafka bindings providing high-performance Kafka operations

95

- **Python Wrappers**: Producer, Consumer, and admin clients with Pythonic interfaces

96

- **High-Level APIs**: SerializingProducer and DeserializingConsumer with pluggable serialization

97

- **Schema Registry Integration**: Seamless integration with Confluent Schema Registry for schema evolution

98

- **Serialization Framework**: Extensible serialization system with built-in support for common data types

99

100

This design provides both performance and flexibility, allowing users to choose between raw performance with the basic APIs or convenience with the high-level serialization-aware APIs.

101

102

## Capabilities

103

104

### Core Producer and Consumer

105

106

Fundamental Kafka producer and consumer functionality with support for all Kafka features including transactions, exactly-once semantics, and custom partitioning.

107

108

```python { .api }

109

class Producer:

110

def __init__(self, conf): ...

111

def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...

112

def poll(self, timeout=-1): ...

113

def flush(self, timeout=-1): ...

114

def list_topics(self, topic=None, timeout=-1): ...

115

116

class Consumer:

117

def __init__(self, conf): ...

118

def subscribe(self, topics, listener=None): ...

119

def poll(self, timeout=-1): ...

120

def commit(self, message=None, offsets=None, asynchronous=True): ...

121

def list_topics(self, topic=None, timeout=-1): ...

122

```

123

124

[Core Producer and Consumer](./core-producer-consumer.md)

125

126

### Admin Client Operations

127

128

Comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, and consumer groups.

129

130

```python { .api }

131

class AdminClient:

132

def __init__(self, conf): ...

133

def create_topics(self, new_topics, **kwargs): ...

134

def delete_topics(self, topics, **kwargs): ...

135

def create_partitions(self, fs, **kwargs): ...

136

def describe_configs(self, resources, **kwargs): ...

137

def alter_configs(self, resources, **kwargs): ...

138

```

139

140

[Admin Client](./admin-client.md)

141

142

### Schema Registry Integration

143

144

Complete integration with Confluent Schema Registry supporting Avro, JSON Schema, and Protobuf with automatic schema evolution and compatibility checking.

145

146

```python { .api }

147

class SchemaRegistryClient:

148

def __init__(self, conf): ...

149

def register_schema(self, subject_name, schema, normalize_schemas=False): ...

150

def get_latest_version(self, subject_name): ...

151

def get_schema(self, schema_id, fetch_max_id=True): ...

152

153

class AvroSerializer:

154

def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): ...

155

def __call__(self, obj, ctx): ...

156

```

157

158

[Schema Registry](./schema-registry.md)

159

160

### Serialization Framework

161

162

Pluggable serialization framework with built-in serializers for common data types and support for custom serialization logic.

163

164

```python { .api }

165

class SerializingProducer:

166

def __init__(self, conf): ...

167

def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...

168

169

class DeserializingConsumer:

170

def __init__(self, conf): ...

171

def poll(self, timeout=-1): ...

172

173

class StringSerializer:

174

def __init__(self, codec='utf_8'): ...

175

def __call__(self, obj, ctx=None): ...

176

```

177

178

[Serialization Framework](./serialization.md)

179

180

### Error Handling

181

182

Comprehensive error handling with specific exception types for different failure modes and detailed error information.

183

184

```python { .api }

185

class KafkaException(Exception): ...

186

class KafkaError: ...

187

class ConsumeError(KafkaException): ...

188

class ProduceError(KafkaException): ...

189

class SerializationError(Exception): ...

190

```

191

192

[Error Handling](./error-handling.md)

193

194

### Additional Classes

195

196

#### ThrottleEvent

197

198

Contains details about throttled requests from Kafka brokers.

199

200

```python { .api }

201

class ThrottleEvent:

202

def __init__(self, broker_name, broker_id, throttle_time):

203

"""

204

Create ThrottleEvent instance.

205

206

Args:

207

broker_name (str): Hostname of the broker that throttled the request

208

broker_id (int): Broker ID

209

throttle_time (float): Throttle time in seconds

210

"""

211

212

@property

213

def broker_name(self):

214

"""Hostname of the broker that throttled the request."""

215

216

@property

217

def broker_id(self):

218

"""Broker ID."""

219

220

@property

221

def throttle_time(self):

222

"""Throttle time in seconds."""

223

```

224

225

## Model Classes

226

227

Core data model classes for representing Kafka metadata and consumer group information.

228

229

```python { .api }

230

class Node:

231

def __init__(self, id, host, port, rack=None):

232

"""

233

Represents broker node information.

234

235

Args:

236

id (int): Node ID

237

host (str): Hostname

238

port (int): Port number

239

rack (str, optional): Rack identifier

240

"""

241

242

class ConsumerGroupTopicPartitions:

243

def __init__(self, group_id, topic_partitions=None):

244

"""

245

Consumer group with topic partition information.

246

247

Args:

248

group_id (str): Consumer group ID

249

topic_partitions (list, optional): List of TopicPartition objects

250

"""

251

252

class TopicCollection:

253

def __init__(self, topic_names):

254

"""

255

Collection of topic names.

256

257

Args:

258

topic_names (list): List of topic name strings

259

"""

260

261

class TopicPartitionInfo:

262

def __init__(self, id, leader, replicas, isr):

263

"""

264

Partition metadata information.

265

266

Args:

267

id (int): Partition ID

268

leader (Node): Leader broker node

269

replicas (list): List of replica nodes

270

isr (list): List of in-sync replica nodes

271

"""

272

273

# Enumeration classes

274

class ConsumerGroupState:

275

UNKNOWN = 0

276

PREPARING_REBALANCING = 1

277

COMPLETING_REBALANCING = 2

278

STABLE = 3

279

DEAD = 4

280

EMPTY = 5

281

282

class ConsumerGroupType:

283

UNKNOWN = 0

284

CONSUMER = 1

285

CLASSIC = 2

286

287

class IsolationLevel:

288

READ_UNCOMMITTED = 0

289

READ_COMMITTED = 1

290

291

class ElectionType:

292

PREFERRED = 0

293

UNCLEAN = 1

294

```

295

296

## Constants

297

298

```python { .api }

299

# Timestamp types

300

TIMESTAMP_NOT_AVAILABLE = -1

301

TIMESTAMP_CREATE_TIME = 0

302

TIMESTAMP_LOG_APPEND_TIME = 1

303

304

# Offset constants

305

OFFSET_BEGINNING = -2

306

OFFSET_END = -1

307

OFFSET_STORED = -1000

308

OFFSET_INVALID = -1001

309

```

310

311

## Version Information

312

313

```python { .api }

314

def version():

315

"""

316

Get confluent-kafka version.

317

318

Returns:

319

tuple: Version tuple (version_string, version_int)

320

"""

321

322

def libversion():

323

"""

324

Get librdkafka version.

325

326

Returns:

327

tuple: Version tuple (version_string, version_int)

328

"""

329

```