or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

authentication.mddocs/

0

# Authentication

1

2

Security and authentication mechanisms for secure connections to Kafka brokers in Faust applications. Provides SSL, SASL, and GSSAPI credential management with support for various authentication protocols, certificate handling, and secure communication configuration.

3

4

## Capabilities

5

6

### SSL Credentials

7

8

SSL/TLS authentication credentials for secure broker connections. Provides certificate-based authentication with support for custom SSL contexts, certificate files, and certificate authority validation.

9

10

```python { .api }

11

class SSLCredentials:

12

def __init__(

13

self,

14

*,

15

context: ssl.SSLContext = None,

16

purpose: ssl.Purpose = None,

17

cafile: str = None,

18

capath: str = None,

19

cadata: str = None,

20

certfile: str = None,

21

keyfile: str = None,

22

password: str = None,

23

ciphers: str = None,

24

**kwargs

25

):

26

"""

27

Create SSL credentials for secure broker connections.

28

29

Args:

30

context: Custom SSL context

31

purpose: SSL purpose (SERVER_AUTH, CLIENT_AUTH)

32

cafile: Certificate authority file path

33

capath: Certificate authority directory path

34

cadata: Certificate authority data string

35

certfile: Client certificate file path

36

keyfile: Client private key file path

37

password: Private key password

38

ciphers: Allowed cipher suites

39

"""

40

41

def load_verify_locations(

42

self,

43

cafile: str = None,

44

capath: str = None,

45

cadata: str = None

46

) -> None:

47

"""

48

Load certificate authority verification locations.

49

50

Args:

51

cafile: CA certificate file

52

capath: CA certificate directory

53

cadata: CA certificate data

54

"""

55

56

def load_cert_chain(

57

self,

58

certfile: str,

59

keyfile: str = None,

60

password: str = None

61

) -> None:

62

"""

63

Load client certificate chain.

64

65

Args:

66

certfile: Certificate file path

67

keyfile: Private key file path

68

password: Private key password

69

"""

70

71

def set_ciphers(self, ciphers: str) -> None:

72

"""

73

Set allowed cipher suites.

74

75

Args:

76

ciphers: Cipher suite specification

77

"""

78

79

@property

80

def context(self) -> ssl.SSLContext:

81

"""SSL context object."""

82

83

@property

84

def cafile(self) -> str:

85

"""Certificate authority file path."""

86

87

@property

88

def certfile(self) -> str:

89

"""Client certificate file path."""

90

91

@property

92

def keyfile(self) -> str:

93

"""Client private key file path."""

94

```

95

96

### SASL Credentials

97

98

Simple Authentication and Security Layer (SASL) credentials for broker authentication. Supports multiple SASL mechanisms including PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and OAUTHBEARER.

99

100

```python { .api }

101

class SASLCredentials:

102

def __init__(

103

self,

104

*,

105

mechanism: str = None,

106

username: str = None,

107

password: str = None,

108

ssl_context: ssl.SSLContext = None,

109

**kwargs

110

):

111

"""

112

Create SASL credentials for broker authentication.

113

114

Args:

115

mechanism: SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'OAUTHBEARER')

116

username: Authentication username

117

password: Authentication password

118

ssl_context: SSL context for secure connections

119

"""

120

121

def create_authenticator(self) -> callable:

122

"""

123

Create authenticator function for SASL mechanism.

124

125

Returns:

126

Authenticator function compatible with Kafka client

127

"""

128

129

@property

130

def mechanism(self) -> str:

131

"""SASL mechanism name."""

132

133

@property

134

def username(self) -> str:

135

"""Authentication username."""

136

137

@property

138

def password(self) -> str:

139

"""Authentication password."""

140

141

@property

142

def ssl_context(self) -> ssl.SSLContext:

143

"""SSL context for secure transport."""

144

145

class PlainCredentials(SASLCredentials):

146

"""SASL PLAIN mechanism credentials."""

147

148

def __init__(self, *, username: str, password: str, **kwargs):

149

super().__init__(mechanism='PLAIN', username=username, password=password, **kwargs)

150

151

class ScramCredentials(SASLCredentials):

152

"""SASL SCRAM mechanism credentials."""

153

154

def __init__(

155

self,

156

*,

157

username: str,

158

password: str,

159

mechanism: str = 'SCRAM-SHA-256',

160

**kwargs

161

):

162

super().__init__(mechanism=mechanism, username=username, password=password, **kwargs)

163

```

164

165

### GSSAPI Credentials

166

167

Generic Security Services Application Program Interface (GSSAPI) credentials for Kerberos authentication. Provides integration with existing Kerberos infrastructure and ticket-based authentication.

168

169

```python { .api }

170

class GSSAPICredentials:

171

def __init__(

172

self,

173

*,

174

kerberos_service_name: str = 'kafka',

175

kerberos_domain_name: str = None,

176

principal: str = None,

177

kinit_cmd: str = None,

178

ticket_renew_window_factor: float = 0.8,

179

**kwargs

180

):

181

"""

182

Create GSSAPI credentials for Kerberos authentication.

183

184

Args:

185

kerberos_service_name: Kerberos service name (default: 'kafka')

186

kerberos_domain_name: Kerberos domain name

187

principal: Kerberos principal name

188

kinit_cmd: Custom kinit command

189

ticket_renew_window_factor: Ticket renewal threshold (0.0-1.0)

190

"""

191

192

def acquire_credentials(self) -> None:

193

"""

194

Acquire Kerberos credentials (TGT).

195

196

Raises:

197

AuthenticationError: If credential acquisition fails

198

"""

199

200

def renew_credentials(self) -> bool:

201

"""

202

Renew Kerberos credentials if needed.

203

204

Returns:

205

True if credentials were renewed

206

"""

207

208

def check_credentials(self) -> bool:

209

"""

210

Check if credentials are valid and not expired.

211

212

Returns:

213

True if credentials are valid

214

"""

215

216

@property

217

def service_name(self) -> str:

218

"""Kerberos service name."""

219

220

@property

221

def domain_name(self) -> str:

222

"""Kerberos domain name."""

223

224

@property

225

def principal(self) -> str:

226

"""Kerberos principal name."""

227

```

228

229

### OAuth Credentials

230

231

OAuth 2.0 credentials for modern authentication workflows with token-based authentication and automatic token refresh capabilities.

232

233

```python { .api }

234

class OAuthCredentials:

235

def __init__(

236

self,

237

*,

238

token_url: str,

239

client_id: str,

240

client_secret: str = None,

241

scope: str = None,

242

audience: str = None,

243

grant_type: str = 'client_credentials',

244

**kwargs

245

):

246

"""

247

Create OAuth credentials for token-based authentication.

248

249

Args:

250

token_url: OAuth token endpoint URL

251

client_id: OAuth client identifier

252

client_secret: OAuth client secret

253

scope: OAuth scope string

254

audience: OAuth audience

255

grant_type: OAuth grant type

256

"""

257

258

async def get_token(self) -> str:

259

"""

260

Get valid access token.

261

262

Returns:

263

Access token string

264

265

Raises:

266

AuthenticationError: If token acquisition fails

267

"""

268

269

async def refresh_token(self) -> str:

270

"""

271

Refresh access token.

272

273

Returns:

274

New access token string

275

"""

276

277

def is_token_expired(self) -> bool:

278

"""

279

Check if current token is expired.

280

281

Returns:

282

True if token needs refresh

283

"""

284

285

@property

286

def client_id(self) -> str:

287

"""OAuth client identifier."""

288

289

@property

290

def token_url(self) -> str:

291

"""OAuth token endpoint URL."""

292

```

293

294

### Authentication Configuration

295

296

Utilities for configuring authentication at the application level with support for multiple credential types and broker-specific settings.

297

298

```python { .api }

299

def configure_ssl(

300

app: App,

301

*,

302

cafile: str = None,

303

certfile: str = None,

304

keyfile: str = None,

305

password: str = None,

306

context: ssl.SSLContext = None,

307

**kwargs

308

) -> None:

309

"""

310

Configure SSL authentication for application.

311

312

Args:

313

app: Faust application

314

cafile: Certificate authority file

315

certfile: Client certificate file

316

keyfile: Client private key file

317

password: Private key password

318

context: Custom SSL context

319

"""

320

321

def configure_sasl(

322

app: App,

323

*,

324

mechanism: str,

325

username: str,

326

password: str,

327

**kwargs

328

) -> None:

329

"""

330

Configure SASL authentication for application.

331

332

Args:

333

app: Faust application

334

mechanism: SASL mechanism

335

username: Authentication username

336

password: Authentication password

337

"""

338

339

def configure_gssapi(

340

app: App,

341

*,

342

service_name: str = 'kafka',

343

domain_name: str = None,

344

**kwargs

345

) -> None:

346

"""

347

Configure GSSAPI authentication for application.

348

349

Args:

350

app: Faust application

351

service_name: Kerberos service name

352

domain_name: Kerberos domain name

353

"""

354

355

class AuthenticationError(Exception):

356

"""Raised when authentication fails."""

357

pass

358

359

class CredentialsError(Exception):

360

"""Raised when credential validation fails."""

361

pass

362

```

363

364

## Usage Examples

365

366

### SSL Authentication

367

368

```python

369

import faust

370

import ssl

371

372

# Create SSL credentials

373

ssl_creds = faust.SSLCredentials(

374

cafile='/path/to/ca-cert.pem',

375

certfile='/path/to/client-cert.pem',

376

keyfile='/path/to/client-key.pem',

377

password='key-password'

378

)

379

380

# Application with SSL authentication

381

app = faust.App(

382

'secure-app',

383

broker='kafka://secure-broker:9093',

384

ssl_credentials=ssl_creds

385

)

386

387

# Alternative: Configure SSL context directly

388

ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)

389

ssl_context.check_hostname = False

390

ssl_context.verify_mode = ssl.CERT_NONE

391

392

app = faust.App(

393

'secure-app',

394

broker='kafka://secure-broker:9093',

395

ssl_context=ssl_context

396

)

397

```

398

399

### SASL Authentication

400

401

```python

402

# SASL PLAIN authentication

403

sasl_creds = faust.SASLCredentials(

404

mechanism='PLAIN',

405

username='kafka-user',

406

password='kafka-password'

407

)

408

409

app = faust.App(

410

'sasl-app',

411

broker='kafka://broker:9092',

412

sasl_credentials=sasl_creds

413

)

414

415

# SCRAM-SHA-256 authentication

416

scram_creds = faust.ScramCredentials(

417

username='kafka-user',

418

password='kafka-password',

419

mechanism='SCRAM-SHA-256'

420

)

421

422

app = faust.App(

423

'scram-app',

424

broker='kafka://broker:9092',

425

sasl_credentials=scram_creds

426

)

427

```

428

429

### Kerberos Authentication

430

431

```python

432

# GSSAPI/Kerberos authentication

433

gssapi_creds = faust.GSSAPICredentials(

434

kerberos_service_name='kafka',

435

kerberos_domain_name='EXAMPLE.COM',

436

principal='kafka-client@EXAMPLE.COM'

437

)

438

439

app = faust.App(

440

'kerberos-app',

441

broker='kafka://broker:9092',

442

gssapi_credentials=gssapi_creds

443

)

444

445

# Acquire credentials before starting

446

@app.on_startup.connect

447

async def acquire_kerberos_ticket():

448

gssapi_creds.acquire_credentials()

449

print("Kerberos ticket acquired")

450

451

# Periodic ticket renewal

452

@app.timer(interval=3600.0) # Renew every hour

453

async def renew_kerberos_ticket():

454

if gssapi_creds.renew_credentials():

455

print("Kerberos ticket renewed")

456

```

457

458

### Combined SSL + SASL

459

460

```python

461

# SSL transport with SASL authentication

462

ssl_context = ssl.create_default_context()

463

ssl_context.check_hostname = True

464

465

sasl_creds = faust.SASLCredentials(

466

mechanism='SCRAM-SHA-512',

467

username='secure-user',

468

password='secure-password',

469

ssl_context=ssl_context

470

)

471

472

app = faust.App(

473

'secure-sasl-app',

474

broker='kafka://secure-broker:9093',

475

ssl_context=ssl_context,

476

sasl_credentials=sasl_creds

477

)

478

```

479

480

### OAuth Authentication

481

482

```python

483

# OAuth 2.0 authentication

484

oauth_creds = faust.OAuthCredentials(

485

token_url='https://auth.example.com/oauth/token',

486

client_id='kafka-client',

487

client_secret='client-secret',

488

scope='kafka:read kafka:write'

489

)

490

491

app = faust.App(

492

'oauth-app',

493

broker='kafka://broker:9092',

494

oauth_credentials=oauth_creds

495

)

496

497

@app.on_startup.connect

498

async def get_initial_token():

499

token = await oauth_creds.get_token()

500

print(f"Initial token acquired: {token[:10]}...")

501

502

# Automatic token refresh

503

@app.timer(interval=1800.0) # Refresh every 30 minutes

504

async def refresh_oauth_token():

505

if oauth_creds.is_token_expired():

506

token = await oauth_creds.refresh_token()

507

print("OAuth token refreshed")

508

```

509

510

### Environment-based Configuration

511

512

```python

513

import os

514

515

def create_app_with_auth():

516

auth_method = os.getenv('KAFKA_AUTH_METHOD', 'none')

517

518

if auth_method == 'ssl':

519

credentials = faust.SSLCredentials(

520

cafile=os.getenv('KAFKA_CA_FILE'),

521

certfile=os.getenv('KAFKA_CERT_FILE'),

522

keyfile=os.getenv('KAFKA_KEY_FILE'),

523

password=os.getenv('KAFKA_KEY_PASSWORD')

524

)

525

return faust.App(

526

'env-app',

527

broker=os.getenv('KAFKA_BROKER'),

528

ssl_credentials=credentials

529

)

530

531

elif auth_method == 'sasl':

532

credentials = faust.SASLCredentials(

533

mechanism=os.getenv('KAFKA_SASL_MECHANISM', 'PLAIN'),

534

username=os.getenv('KAFKA_USERNAME'),

535

password=os.getenv('KAFKA_PASSWORD')

536

)

537

return faust.App(

538

'env-app',

539

broker=os.getenv('KAFKA_BROKER'),

540

sasl_credentials=credentials

541

)

542

543

else:

544

return faust.App(

545

'env-app',

546

broker=os.getenv('KAFKA_BROKER', 'kafka://localhost:9092')

547

)

548

549

app = create_app_with_auth()

550

```

551

552

### Error Handling

553

554

```python

555

from faust import AuthenticationError, CredentialsError

556

557

@app.on_startup.connect

558

async def validate_credentials():

559

try:

560

# Validate credentials before starting

561

if hasattr(app, 'ssl_credentials'):

562

app.ssl_credentials.load_verify_locations()

563

564

if hasattr(app, 'gssapi_credentials'):

565

if not app.gssapi_credentials.check_credentials():

566

app.gssapi_credentials.acquire_credentials()

567

568

except CredentialsError as e:

569

print(f"Credential validation failed: {e}")

570

raise

571

572

except AuthenticationError as e:

573

print(f"Authentication failed: {e}")

574

raise

575

```

576

577

## Type Interfaces

578

579

```python { .api }

580

from typing import Protocol, Optional

581

import ssl

582

583

class CredentialsT(Protocol):

584

"""Base type interface for all credentials."""

585

pass

586

587

class SSLCredentialsT(CredentialsT, Protocol):

588

"""Type interface for SSL credentials."""

589

590

context: ssl.SSLContext

591

cafile: Optional[str]

592

certfile: Optional[str]

593

keyfile: Optional[str]

594

595

def load_verify_locations(self, **kwargs) -> None: ...

596

def load_cert_chain(self, certfile: str, **kwargs) -> None: ...

597

598

class SASLCredentialsT(CredentialsT, Protocol):

599

"""Type interface for SASL credentials."""

600

601

mechanism: str

602

username: str

603

password: str

604

ssl_context: Optional[ssl.SSLContext]

605

606

def create_authenticator(self) -> callable: ...

607

608

class GSSAPICredentialsT(CredentialsT, Protocol):

609

"""Type interface for GSSAPI credentials."""

610

611

service_name: str

612

domain_name: Optional[str]

613

principal: Optional[str]

614

615

def acquire_credentials(self) -> None: ...

616

def renew_credentials(self) -> bool: ...

617

def check_credentials(self) -> bool: ...

618

```