or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md

connection.mddocs/

0

# Connection and Configuration

1

2

Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.

3

4

## Capabilities

5

6

### BrokerConnection

7

8

Low-level connection management for individual Kafka brokers.

9

10

```python { .api }

11

class BrokerConnection:

12

def __init__(self, host, port, **configs):

13

"""

14

Create a connection to a Kafka broker.

15

16

Args:

17

host (str): Broker hostname

18

port (int): Broker port

19

**configs: Connection configuration options including:

20

socket_timeout_ms (int): Socket timeout

21

socket_receive_buffer_bytes (int): Socket receive buffer size

22

socket_send_buffer_bytes (int): Socket send buffer size

23

socket_keepalive (bool): Enable TCP keepalive

24

security_protocol (str): Security protocol

25

ssl_context: SSL context

26

ssl_check_hostname (bool): Verify SSL hostname

27

ssl_cafile (str): CA certificate file path

28

ssl_certfile (str): Client certificate file path

29

ssl_keyfile (str): Client key file path

30

ssl_crlfile (str): Certificate revocation list file

31

ssl_password (str): Private key password

32

sasl_mechanism (str): SASL mechanism

33

sasl_plain_username (str): SASL PLAIN username

34

sasl_plain_password (str): SASL PLAIN password

35

sasl_kerberos_service_name (str): Kerberos service name

36

sasl_oauth_token_provider: OAuth token provider

37

"""

38

39

def connect(self, timeout=None):

40

"""

41

Establish connection to broker.

42

43

Args:

44

timeout (float): Connection timeout in seconds

45

46

Returns:

47

bool: True if connection successful

48

"""

49

50

def close(self):

51

"""Close the connection."""

52

53

def connected(self):

54

"""

55

Check if connection is active.

56

57

Returns:

58

bool: True if connected

59

"""

60

61

def send(self, request):

62

"""

63

Send request to broker.

64

65

Args:

66

request: Protocol request object

67

"""

68

69

def recv(self):

70

"""

71

Receive response from broker.

72

73

Returns:

74

Response object from broker

75

"""

76

```

77

78

### Client Configuration

79

80

Common configuration options for Kafka clients.

81

82

```python { .api }

83

# Bootstrap and Connection Settings

84

bootstrap_servers = ['localhost:9092'] # List of broker addresses

85

client_id = 'my-kafka-client' # Client identifier

86

connections_max_idle_ms = 540000 # Max connection idle time

87

request_timeout_ms = 30000 # Request timeout

88

retry_backoff_ms = 100 # Retry backoff time

89

reconnect_backoff_ms = 50 # Reconnection backoff

90

reconnect_backoff_max_ms = 1000 # Max reconnection backoff

91

92

# Security Settings

93

security_protocol = 'PLAINTEXT' # PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

94

ssl_context = None # Custom SSL context

95

ssl_check_hostname = True # Verify SSL hostname

96

ssl_cafile = '/path/to/ca-cert.pem' # CA certificate file

97

ssl_certfile = '/path/to/client.pem' # Client certificate

98

ssl_keyfile = '/path/to/client.key' # Client private key

99

ssl_password = 'key-password' # Private key password

100

ssl_crlfile = '/path/to/crl.pem' # Certificate revocation list

101

102

# SASL Authentication

103

sasl_mechanism = 'PLAIN' # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER

104

sasl_plain_username = 'myuser' # SASL PLAIN username

105

sasl_plain_password = 'mypassword' # SASL PLAIN password

106

sasl_kerberos_service_name = 'kafka' # Kerberos service name

107

sasl_oauth_token_provider = None # OAuth token provider

108

109

# Network and Socket Settings

110

socket_timeout_ms = 30000 # Socket timeout

111

socket_receive_buffer_bytes = 65536 # Socket receive buffer

112

socket_send_buffer_bytes = 131072 # Socket send buffer

113

socket_keepalive = False # TCP keepalive

114

```

115

116

### Authentication Mechanisms

117

118

Supported authentication mechanisms with configuration examples.

119

120

```python { .api }

121

# SASL/PLAIN Authentication

122

sasl_plain_config = {

123

'security_protocol': 'SASL_PLAINTEXT',

124

'sasl_mechanism': 'PLAIN',

125

'sasl_plain_username': 'myuser',

126

'sasl_plain_password': 'mypassword'

127

}

128

129

# SASL/SCRAM Authentication

130

sasl_scram_config = {

131

'security_protocol': 'SASL_SSL',

132

'sasl_mechanism': 'SCRAM-SHA-256',

133

'sasl_plain_username': 'myuser',

134

'sasl_plain_password': 'mypassword',

135

'ssl_cafile': 'ca-cert.pem'

136

}

137

138

# Kerberos/GSSAPI Authentication

139

sasl_kerberos_config = {

140

'security_protocol': 'SASL_PLAINTEXT',

141

'sasl_mechanism': 'GSSAPI',

142

'sasl_kerberos_service_name': 'kafka'

143

}

144

145

# OAuth Bearer Authentication

146

sasl_oauth_config = {

147

'security_protocol': 'SASL_SSL',

148

'sasl_mechanism': 'OAUTHBEARER',

149

'sasl_oauth_token_provider': CustomTokenProvider()

150

}

151

152

# SSL Client Certificate Authentication

153

ssl_client_cert_config = {

154

'security_protocol': 'SSL',

155

'ssl_cafile': 'ca-cert.pem',

156

'ssl_certfile': 'client-cert.pem',

157

'ssl_keyfile': 'client-key.pem',

158

'ssl_password': 'key-password'

159

}

160

```

161

162

### OAuth Token Provider

163

164

Abstract base class for implementing OAuth token providers.

165

166

```python { .api }

167

class AbstractTokenProvider:

168

def token(self):

169

"""

170

Get current OAuth token.

171

172

Returns:

173

str: Valid OAuth token

174

"""

175

176

def close(self):

177

"""Clean up token provider resources."""

178

```

179

180

## Usage Examples

181

182

### Basic Connection

183

184

```python

185

from kafka import KafkaProducer, KafkaConsumer

186

187

# Basic connection to local Kafka

188

producer = KafkaProducer(

189

bootstrap_servers=['localhost:9092'],

190

client_id='my-producer'

191

)

192

193

consumer = KafkaConsumer(

194

bootstrap_servers=['localhost:9092'],

195

client_id='my-consumer',

196

group_id='my-group'

197

)

198

```

199

200

### SSL Encryption

201

202

```python

203

import ssl

204

from kafka import KafkaProducer

205

206

# SSL with CA verification

207

producer = KafkaProducer(

208

bootstrap_servers=['secure-broker:9093'],

209

security_protocol='SSL',

210

ssl_check_hostname=True,

211

ssl_cafile='ca-cert.pem',

212

ssl_certfile='client-cert.pem', # Optional client cert

213

ssl_keyfile='client-key.pem', # Optional client key

214

ssl_password='key-password' # Optional key password

215

)

216

217

# Custom SSL context

218

ssl_context = ssl.create_default_context()

219

ssl_context.check_hostname = False

220

ssl_context.verify_mode = ssl.CERT_NONE

221

222

producer = KafkaProducer(

223

bootstrap_servers=['broker:9093'],

224

security_protocol='SSL',

225

ssl_context=ssl_context

226

)

227

```

228

229

### SASL Authentication

230

231

```python

232

from kafka import KafkaProducer

233

234

# SASL/PLAIN over plaintext

235

producer = KafkaProducer(

236

bootstrap_servers=['broker:9092'],

237

security_protocol='SASL_PLAINTEXT',

238

sasl_mechanism='PLAIN',

239

sasl_plain_username='alice',

240

sasl_plain_password='secret'

241

)

242

243

# SASL/SCRAM over SSL

244

producer = KafkaProducer(

245

bootstrap_servers=['secure-broker:9093'],

246

security_protocol='SASL_SSL',

247

sasl_mechanism='SCRAM-SHA-256',

248

sasl_plain_username='bob',

249

sasl_plain_password='secret',

250

ssl_cafile='ca-cert.pem'

251

)

252

253

# Kerberos authentication

254

producer = KafkaProducer(

255

bootstrap_servers=['kerb-broker:9092'],

256

security_protocol='SASL_PLAINTEXT',

257

sasl_mechanism='GSSAPI',

258

sasl_kerberos_service_name='kafka'

259

)

260

```

261

262

### AWS MSK IAM Authentication

263

264

```python

265

from kafka import KafkaProducer

266

from kafka.oauth import AbstractTokenProvider

267

import boto3

268

269

class AWSTokenProvider(AbstractTokenProvider):

270

def __init__(self, region='us-east-1'):

271

self.region = region

272

self.session = boto3.Session()

273

274

def token(self):

275

# Generate AWS IAM token for MSK

276

client = self.session.client('kafka', region_name=self.region)

277

# Implementation would generate proper AWS IAM token

278

return "aws-iam-token"

279

280

# AWS MSK with IAM

281

producer = KafkaProducer(

282

bootstrap_servers=['msk-cluster.amazonaws.com:9098'],

283

security_protocol='SASL_SSL',

284

sasl_mechanism='OAUTHBEARER',

285

sasl_oauth_token_provider=AWSTokenProvider(),

286

ssl_check_hostname=True

287

)

288

```

289

290

### Custom OAuth Provider

291

292

```python

293

from kafka import KafkaProducer

294

from kafka.oauth import AbstractTokenProvider

295

import requests

296

297

class CustomOAuthProvider(AbstractTokenProvider):

298

def __init__(self, client_id, client_secret, token_url):

299

self.client_id = client_id

300

self.client_secret = client_secret

301

self.token_url = token_url

302

self._token = None

303

self._expires_at = 0

304

305

def token(self):

306

import time

307

if self._token is None or time.time() >= self._expires_at:

308

self._refresh_token()

309

return self._token

310

311

def _refresh_token(self):

312

import time

313

response = requests.post(self.token_url, data={

314

'grant_type': 'client_credentials',

315

'client_id': self.client_id,

316

'client_secret': self.client_secret

317

})

318

data = response.json()

319

self._token = data['access_token']

320

self._expires_at = time.time() + data['expires_in'] - 60

321

322

def close(self):

323

pass

324

325

producer = KafkaProducer(

326

bootstrap_servers=['oauth-broker:9092'],

327

security_protocol='SASL_SSL',

328

sasl_mechanism='OAUTHBEARER',

329

sasl_oauth_token_provider=CustomOAuthProvider(

330

client_id='my-client',

331

client_secret='my-secret',

332

token_url='https://auth.example.com/token'

333

)

334

)

335

```

336

337

### Connection Tuning

338

339

```python

340

from kafka import KafkaProducer

341

342

# High-performance connection settings

343

producer = KafkaProducer(

344

bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],

345

346

# Connection settings

347

connections_max_idle_ms=600000, # 10 minutes

348

request_timeout_ms=30000, # 30 seconds

349

retry_backoff_ms=100, # Fast retries

350

reconnect_backoff_ms=50, # Fast reconnection

351

reconnect_backoff_max_ms=1000, # Max 1 second backoff

352

353

# Socket settings

354

socket_timeout_ms=30000, # 30 second socket timeout

355

socket_receive_buffer_bytes=131072, # 128KB receive buffer

356

socket_send_buffer_bytes=131072, # 128KB send buffer

357

socket_keepalive=True, # Enable TCP keepalive

358

359

# Client identification

360

client_id='high-perf-producer'

361

)

362

```

363

364

### Multiple Cluster Connection

365

366

```python

367

from kafka import KafkaProducer, KafkaConsumer

368

369

# Connect to multiple clusters

370

primary_producer = KafkaProducer(

371

bootstrap_servers=['primary-broker:9092'],

372

client_id='primary-producer'

373

)

374

375

backup_producer = KafkaProducer(

376

bootstrap_servers=['backup-broker:9092'],

377

client_id='backup-producer'

378

)

379

380

# Cross-cluster replication consumer

381

consumer = KafkaConsumer(

382

'source-topic',

383

bootstrap_servers=['source-cluster:9092'],

384

group_id='replicator'

385

)

386

387

for message in consumer:

388

# Replicate to backup cluster

389

backup_producer.send('target-topic',

390

key=message.key,

391

value=message.value)

392

```