0
# Connection and Configuration
1
2
Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.
3
4
## Capabilities
5
6
### BrokerConnection
7
8
Low-level connection management for individual Kafka brokers.
9
10
```python { .api }
11
class BrokerConnection:
12
def __init__(self, host, port, **configs):
13
"""
14
Create a connection to a Kafka broker.
15
16
Args:
17
host (str): Broker hostname
18
port (int): Broker port
19
**configs: Connection configuration options including:
20
socket_timeout_ms (int): Socket timeout
21
socket_receive_buffer_bytes (int): Socket receive buffer size
22
socket_send_buffer_bytes (int): Socket send buffer size
23
socket_keepalive (bool): Enable TCP keepalive
24
security_protocol (str): Security protocol
25
ssl_context: SSL context
26
ssl_check_hostname (bool): Verify SSL hostname
27
ssl_cafile (str): CA certificate file path
28
ssl_certfile (str): Client certificate file path
29
ssl_keyfile (str): Client key file path
30
ssl_crlfile (str): Certificate revocation list file
31
ssl_password (str): Private key password
32
sasl_mechanism (str): SASL mechanism
33
sasl_plain_username (str): SASL PLAIN username
34
sasl_plain_password (str): SASL PLAIN password
35
sasl_kerberos_service_name (str): Kerberos service name
36
sasl_oauth_token_provider: OAuth token provider
37
"""
38
39
def connect(self, timeout=None):
40
"""
41
Establish connection to broker.
42
43
Args:
44
timeout (float): Connection timeout in seconds
45
46
Returns:
47
bool: True if connection successful
48
"""
49
50
def close(self):
51
"""Close the connection."""
52
53
def connected(self):
54
"""
55
Check if connection is active.
56
57
Returns:
58
bool: True if connected
59
"""
60
61
def send(self, request):
62
"""
63
Send request to broker.
64
65
Args:
66
request: Protocol request object
67
"""
68
69
def recv(self):
70
"""
71
Receive response from broker.
72
73
Returns:
74
Response object from broker
75
"""
76
```
77
78
### Client Configuration
79
80
Common configuration options for Kafka clients.
81
82
```python { .api }
83
# Bootstrap and Connection Settings
84
bootstrap_servers = ['localhost:9092'] # List of broker addresses
85
client_id = 'my-kafka-client' # Client identifier
86
connections_max_idle_ms = 540000 # Max connection idle time
87
request_timeout_ms = 30000 # Request timeout
88
retry_backoff_ms = 100 # Retry backoff time
89
reconnect_backoff_ms = 50 # Reconnection backoff
90
reconnect_backoff_max_ms = 1000 # Max reconnection backoff
91
92
# Security Settings
93
security_protocol = 'PLAINTEXT' # PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
94
ssl_context = None # Custom SSL context
95
ssl_check_hostname = True # Verify SSL hostname
96
ssl_cafile = '/path/to/ca-cert.pem' # CA certificate file
97
ssl_certfile = '/path/to/client.pem' # Client certificate
98
ssl_keyfile = '/path/to/client.key' # Client private key
99
ssl_password = 'key-password' # Private key password
100
ssl_crlfile = '/path/to/crl.pem' # Certificate revocation list
101
102
# SASL Authentication
103
sasl_mechanism = 'PLAIN' # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
104
sasl_plain_username = 'myuser' # SASL PLAIN username
105
sasl_plain_password = 'mypassword' # SASL PLAIN password
106
sasl_kerberos_service_name = 'kafka' # Kerberos service name
107
sasl_oauth_token_provider = None # OAuth token provider
108
109
# Network and Socket Settings
110
socket_timeout_ms = 30000 # Socket timeout
111
socket_receive_buffer_bytes = 65536 # Socket receive buffer
112
socket_send_buffer_bytes = 131072 # Socket send buffer
113
socket_keepalive = False # TCP keepalive
114
```
115
116
### Authentication Mechanisms
117
118
Supported authentication mechanisms with configuration examples.
119
120
```python { .api }
121
# SASL/PLAIN Authentication
122
sasl_plain_config = {
123
'security_protocol': 'SASL_PLAINTEXT',
124
'sasl_mechanism': 'PLAIN',
125
'sasl_plain_username': 'myuser',
126
'sasl_plain_password': 'mypassword'
127
}
128
129
# SASL/SCRAM Authentication
130
sasl_scram_config = {
131
'security_protocol': 'SASL_SSL',
132
'sasl_mechanism': 'SCRAM-SHA-256',
133
'sasl_plain_username': 'myuser',
134
'sasl_plain_password': 'mypassword',
135
'ssl_cafile': 'ca-cert.pem'
136
}
137
138
# Kerberos/GSSAPI Authentication
139
sasl_kerberos_config = {
140
'security_protocol': 'SASL_PLAINTEXT',
141
'sasl_mechanism': 'GSSAPI',
142
'sasl_kerberos_service_name': 'kafka'
143
}
144
145
# OAuth Bearer Authentication
146
sasl_oauth_config = {
147
'security_protocol': 'SASL_SSL',
148
'sasl_mechanism': 'OAUTHBEARER',
149
'sasl_oauth_token_provider': CustomTokenProvider()
150
}
151
152
# SSL Client Certificate Authentication
153
ssl_client_cert_config = {
154
'security_protocol': 'SSL',
155
'ssl_cafile': 'ca-cert.pem',
156
'ssl_certfile': 'client-cert.pem',
157
'ssl_keyfile': 'client-key.pem',
158
'ssl_password': 'key-password'
159
}
160
```
161
162
### OAuth Token Provider
163
164
Abstract base class for implementing OAuth token providers.
165
166
```python { .api }
167
class AbstractTokenProvider:
168
def token(self):
169
"""
170
Get current OAuth token.
171
172
Returns:
173
str: Valid OAuth token
174
"""
175
176
def close(self):
177
"""Clean up token provider resources."""
178
```
179
180
## Usage Examples
181
182
### Basic Connection
183
184
```python
185
from kafka import KafkaProducer, KafkaConsumer
186
187
# Basic connection to local Kafka
188
producer = KafkaProducer(
189
bootstrap_servers=['localhost:9092'],
190
client_id='my-producer'
191
)
192
193
consumer = KafkaConsumer(
194
bootstrap_servers=['localhost:9092'],
195
client_id='my-consumer',
196
group_id='my-group'
197
)
198
```
199
200
### SSL Encryption
201
202
```python
203
import ssl
204
from kafka import KafkaProducer
205
206
# SSL with CA verification
207
producer = KafkaProducer(
208
bootstrap_servers=['secure-broker:9093'],
209
security_protocol='SSL',
210
ssl_check_hostname=True,
211
ssl_cafile='ca-cert.pem',
212
ssl_certfile='client-cert.pem', # Optional client cert
213
ssl_keyfile='client-key.pem', # Optional client key
214
ssl_password='key-password' # Optional key password
215
)
216
217
# Custom SSL context
218
ssl_context = ssl.create_default_context()
219
ssl_context.check_hostname = False
220
ssl_context.verify_mode = ssl.CERT_NONE
221
222
producer = KafkaProducer(
223
bootstrap_servers=['broker:9093'],
224
security_protocol='SSL',
225
ssl_context=ssl_context
226
)
227
```
228
229
### SASL Authentication
230
231
```python
232
from kafka import KafkaProducer
233
234
# SASL/PLAIN over plaintext
235
producer = KafkaProducer(
236
bootstrap_servers=['broker:9092'],
237
security_protocol='SASL_PLAINTEXT',
238
sasl_mechanism='PLAIN',
239
sasl_plain_username='alice',
240
sasl_plain_password='secret'
241
)
242
243
# SASL/SCRAM over SSL
244
producer = KafkaProducer(
245
bootstrap_servers=['secure-broker:9093'],
246
security_protocol='SASL_SSL',
247
sasl_mechanism='SCRAM-SHA-256',
248
sasl_plain_username='bob',
249
sasl_plain_password='secret',
250
ssl_cafile='ca-cert.pem'
251
)
252
253
# Kerberos authentication
254
producer = KafkaProducer(
255
bootstrap_servers=['kerb-broker:9092'],
256
security_protocol='SASL_PLAINTEXT',
257
sasl_mechanism='GSSAPI',
258
sasl_kerberos_service_name='kafka'
259
)
260
```
261
262
### AWS MSK IAM Authentication
263
264
```python
265
from kafka import KafkaProducer
266
from kafka.oauth import AbstractTokenProvider
267
import boto3
268
269
class AWSTokenProvider(AbstractTokenProvider):
270
def __init__(self, region='us-east-1'):
271
self.region = region
272
self.session = boto3.Session()
273
274
def token(self):
275
# Generate AWS IAM token for MSK
276
client = self.session.client('kafka', region_name=self.region)
277
# Implementation would generate proper AWS IAM token
278
return "aws-iam-token"
279
280
# AWS MSK with IAM
281
producer = KafkaProducer(
282
bootstrap_servers=['msk-cluster.amazonaws.com:9098'],
283
security_protocol='SASL_SSL',
284
sasl_mechanism='OAUTHBEARER',
285
sasl_oauth_token_provider=AWSTokenProvider(),
286
ssl_check_hostname=True
287
)
288
```
289
290
### Custom OAuth Provider
291
292
```python
293
from kafka import KafkaProducer
294
from kafka.oauth import AbstractTokenProvider
295
import requests
296
297
class CustomOAuthProvider(AbstractTokenProvider):
298
def __init__(self, client_id, client_secret, token_url):
299
self.client_id = client_id
300
self.client_secret = client_secret
301
self.token_url = token_url
302
self._token = None
303
self._expires_at = 0
304
305
def token(self):
306
import time
307
if self._token is None or time.time() >= self._expires_at:
308
self._refresh_token()
309
return self._token
310
311
def _refresh_token(self):
312
import time
313
response = requests.post(self.token_url, data={
314
'grant_type': 'client_credentials',
315
'client_id': self.client_id,
316
'client_secret': self.client_secret
317
})
318
data = response.json()
319
self._token = data['access_token']
320
self._expires_at = time.time() + data['expires_in'] - 60
321
322
def close(self):
323
pass
324
325
producer = KafkaProducer(
326
bootstrap_servers=['oauth-broker:9092'],
327
security_protocol='SASL_SSL',
328
sasl_mechanism='OAUTHBEARER',
329
sasl_oauth_token_provider=CustomOAuthProvider(
330
client_id='my-client',
331
client_secret='my-secret',
332
token_url='https://auth.example.com/token'
333
)
334
)
335
```
336
337
### Connection Tuning
338
339
```python
340
from kafka import KafkaProducer
341
342
# High-performance connection settings
343
producer = KafkaProducer(
344
bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
345
346
# Connection settings
347
connections_max_idle_ms=600000, # 10 minutes
348
request_timeout_ms=30000, # 30 seconds
349
retry_backoff_ms=100, # Fast retries
350
reconnect_backoff_ms=50, # Fast reconnection
351
reconnect_backoff_max_ms=1000, # Max 1 second backoff
352
353
# Socket settings
354
socket_timeout_ms=30000, # 30 second socket timeout
355
socket_receive_buffer_bytes=131072, # 128KB receive buffer
356
socket_send_buffer_bytes=131072, # 128KB send buffer
357
socket_keepalive=True, # Enable TCP keepalive
358
359
# Client identification
360
client_id='high-perf-producer'
361
)
362
```
363
364
### Multiple Cluster Connection
365
366
```python
367
from kafka import KafkaProducer, KafkaConsumer
368
369
# Connect to multiple clusters
370
primary_producer = KafkaProducer(
371
bootstrap_servers=['primary-broker:9092'],
372
client_id='primary-producer'
373
)
374
375
backup_producer = KafkaProducer(
376
bootstrap_servers=['backup-broker:9092'],
377
client_id='backup-producer'
378
)
379
380
# Cross-cluster replication consumer
381
consumer = KafkaConsumer(
382
'source-topic',
383
bootstrap_servers=['source-cluster:9092'],
384
group_id='replicator'
385
)
386
387
for message in consumer:
388
# Replicate to backup cluster
389
backup_producer.send('target-topic',
390
key=message.key,
391
value=message.value)
392
```