0
# Authentication
1
2
Security and authentication mechanisms for secure connections to Kafka brokers in Faust applications. Provides SSL, SASL, and GSSAPI credential management with support for various authentication protocols, certificate handling, and secure communication configuration.
3
4
## Capabilities
5
6
### SSL Credentials
7
8
SSL/TLS authentication credentials for secure broker connections. Provides certificate-based authentication with support for custom SSL contexts, certificate files, and certificate authority validation.
9
10
```python { .api }
11
class SSLCredentials:
12
def __init__(
13
self,
14
*,
15
context: ssl.SSLContext = None,
16
purpose: ssl.Purpose = None,
17
cafile: str = None,
18
capath: str = None,
19
cadata: str = None,
20
certfile: str = None,
21
keyfile: str = None,
22
password: str = None,
23
ciphers: str = None,
24
**kwargs
25
):
26
"""
27
Create SSL credentials for secure broker connections.
28
29
Args:
30
context: Custom SSL context
31
purpose: SSL purpose (SERVER_AUTH, CLIENT_AUTH)
32
cafile: Certificate authority file path
33
capath: Certificate authority directory path
34
cadata: Certificate authority data string
35
certfile: Client certificate file path
36
keyfile: Client private key file path
37
password: Private key password
38
ciphers: Allowed cipher suites
39
"""
40
41
def load_verify_locations(
42
self,
43
cafile: str = None,
44
capath: str = None,
45
cadata: str = None
46
) -> None:
47
"""
48
Load certificate authority verification locations.
49
50
Args:
51
cafile: CA certificate file
52
capath: CA certificate directory
53
cadata: CA certificate data
54
"""
55
56
def load_cert_chain(
57
self,
58
certfile: str,
59
keyfile: str = None,
60
password: str = None
61
) -> None:
62
"""
63
Load client certificate chain.
64
65
Args:
66
certfile: Certificate file path
67
keyfile: Private key file path
68
password: Private key password
69
"""
70
71
def set_ciphers(self, ciphers: str) -> None:
72
"""
73
Set allowed cipher suites.
74
75
Args:
76
ciphers: Cipher suite specification
77
"""
78
79
@property
80
def context(self) -> ssl.SSLContext:
81
"""SSL context object."""
82
83
@property
84
def cafile(self) -> str:
85
"""Certificate authority file path."""
86
87
@property
88
def certfile(self) -> str:
89
"""Client certificate file path."""
90
91
@property
92
def keyfile(self) -> str:
93
"""Client private key file path."""
94
```
95
96
### SASL Credentials
97
98
Simple Authentication and Security Layer (SASL) credentials for broker authentication. Supports multiple SASL mechanisms including PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and OAUTHBEARER.
99
100
```python { .api }
101
class SASLCredentials:
102
def __init__(
103
self,
104
*,
105
mechanism: str = None,
106
username: str = None,
107
password: str = None,
108
ssl_context: ssl.SSLContext = None,
109
**kwargs
110
):
111
"""
112
Create SASL credentials for broker authentication.
113
114
Args:
115
mechanism: SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'OAUTHBEARER')
116
username: Authentication username
117
password: Authentication password
118
ssl_context: SSL context for secure connections
119
"""
120
121
def create_authenticator(self) -> callable:
122
"""
123
Create authenticator function for SASL mechanism.
124
125
Returns:
126
Authenticator function compatible with Kafka client
127
"""
128
129
@property
130
def mechanism(self) -> str:
131
"""SASL mechanism name."""
132
133
@property
134
def username(self) -> str:
135
"""Authentication username."""
136
137
@property
138
def password(self) -> str:
139
"""Authentication password."""
140
141
@property
142
def ssl_context(self) -> ssl.SSLContext:
143
"""SSL context for secure transport."""
144
145
class PlainCredentials(SASLCredentials):
146
"""SASL PLAIN mechanism credentials."""
147
148
def __init__(self, *, username: str, password: str, **kwargs):
149
super().__init__(mechanism='PLAIN', username=username, password=password, **kwargs)
150
151
class ScramCredentials(SASLCredentials):
152
"""SASL SCRAM mechanism credentials."""
153
154
def __init__(
155
self,
156
*,
157
username: str,
158
password: str,
159
mechanism: str = 'SCRAM-SHA-256',
160
**kwargs
161
):
162
super().__init__(mechanism=mechanism, username=username, password=password, **kwargs)
163
```
164
165
### GSSAPI Credentials
166
167
Generic Security Services Application Program Interface (GSSAPI) credentials for Kerberos authentication. Provides integration with existing Kerberos infrastructure and ticket-based authentication.
168
169
```python { .api }
170
class GSSAPICredentials:
171
def __init__(
172
self,
173
*,
174
kerberos_service_name: str = 'kafka',
175
kerberos_domain_name: str = None,
176
principal: str = None,
177
kinit_cmd: str = None,
178
ticket_renew_window_factor: float = 0.8,
179
**kwargs
180
):
181
"""
182
Create GSSAPI credentials for Kerberos authentication.
183
184
Args:
185
kerberos_service_name: Kerberos service name (default: 'kafka')
186
kerberos_domain_name: Kerberos domain name
187
principal: Kerberos principal name
188
kinit_cmd: Custom kinit command
189
ticket_renew_window_factor: Ticket renewal threshold (0.0-1.0)
190
"""
191
192
def acquire_credentials(self) -> None:
193
"""
194
Acquire Kerberos credentials (TGT).
195
196
Raises:
197
AuthenticationError: If credential acquisition fails
198
"""
199
200
def renew_credentials(self) -> bool:
201
"""
202
Renew Kerberos credentials if needed.
203
204
Returns:
205
True if credentials were renewed
206
"""
207
208
def check_credentials(self) -> bool:
209
"""
210
Check if credentials are valid and not expired.
211
212
Returns:
213
True if credentials are valid
214
"""
215
216
@property
217
def service_name(self) -> str:
218
"""Kerberos service name."""
219
220
@property
221
def domain_name(self) -> str:
222
"""Kerberos domain name."""
223
224
@property
225
def principal(self) -> str:
226
"""Kerberos principal name."""
227
```
228
229
### OAuth Credentials
230
231
OAuth 2.0 credentials for modern authentication workflows with token-based authentication and automatic token refresh capabilities.
232
233
```python { .api }
234
class OAuthCredentials:
235
def __init__(
236
self,
237
*,
238
token_url: str,
239
client_id: str,
240
client_secret: str = None,
241
scope: str = None,
242
audience: str = None,
243
grant_type: str = 'client_credentials',
244
**kwargs
245
):
246
"""
247
Create OAuth credentials for token-based authentication.
248
249
Args:
250
token_url: OAuth token endpoint URL
251
client_id: OAuth client identifier
252
client_secret: OAuth client secret
253
scope: OAuth scope string
254
audience: OAuth audience
255
grant_type: OAuth grant type
256
"""
257
258
async def get_token(self) -> str:
259
"""
260
Get valid access token.
261
262
Returns:
263
Access token string
264
265
Raises:
266
AuthenticationError: If token acquisition fails
267
"""
268
269
async def refresh_token(self) -> str:
270
"""
271
Refresh access token.
272
273
Returns:
274
New access token string
275
"""
276
277
def is_token_expired(self) -> bool:
278
"""
279
Check if current token is expired.
280
281
Returns:
282
True if token needs refresh
283
"""
284
285
@property
286
def client_id(self) -> str:
287
"""OAuth client identifier."""
288
289
@property
290
def token_url(self) -> str:
291
"""OAuth token endpoint URL."""
292
```
293
294
### Authentication Configuration
295
296
Utilities for configuring authentication at the application level with support for multiple credential types and broker-specific settings.
297
298
```python { .api }
299
def configure_ssl(
300
app: App,
301
*,
302
cafile: str = None,
303
certfile: str = None,
304
keyfile: str = None,
305
password: str = None,
306
context: ssl.SSLContext = None,
307
**kwargs
308
) -> None:
309
"""
310
Configure SSL authentication for application.
311
312
Args:
313
app: Faust application
314
cafile: Certificate authority file
315
certfile: Client certificate file
316
keyfile: Client private key file
317
password: Private key password
318
context: Custom SSL context
319
"""
320
321
def configure_sasl(
322
app: App,
323
*,
324
mechanism: str,
325
username: str,
326
password: str,
327
**kwargs
328
) -> None:
329
"""
330
Configure SASL authentication for application.
331
332
Args:
333
app: Faust application
334
mechanism: SASL mechanism
335
username: Authentication username
336
password: Authentication password
337
"""
338
339
def configure_gssapi(
340
app: App,
341
*,
342
service_name: str = 'kafka',
343
domain_name: str = None,
344
**kwargs
345
) -> None:
346
"""
347
Configure GSSAPI authentication for application.
348
349
Args:
350
app: Faust application
351
service_name: Kerberos service name
352
domain_name: Kerberos domain name
353
"""
354
355
class AuthenticationError(Exception):
356
"""Raised when authentication fails."""
357
pass
358
359
class CredentialsError(Exception):
360
"""Raised when credential validation fails."""
361
pass
362
```
363
364
## Usage Examples
365
366
### SSL Authentication
367
368
```python
369
import faust
370
import ssl
371
372
# Create SSL credentials
373
ssl_creds = faust.SSLCredentials(
374
cafile='/path/to/ca-cert.pem',
375
certfile='/path/to/client-cert.pem',
376
keyfile='/path/to/client-key.pem',
377
password='key-password'
378
)
379
380
# Application with SSL authentication
381
app = faust.App(
382
'secure-app',
383
broker='kafka://secure-broker:9093',
384
ssl_credentials=ssl_creds
385
)
386
387
# Alternative: Configure SSL context directly
388
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
389
ssl_context.check_hostname = False
390
ssl_context.verify_mode = ssl.CERT_NONE
391
392
app = faust.App(
393
'secure-app',
394
broker='kafka://secure-broker:9093',
395
ssl_context=ssl_context
396
)
397
```
398
399
### SASL Authentication
400
401
```python
402
# SASL PLAIN authentication
403
sasl_creds = faust.SASLCredentials(
404
mechanism='PLAIN',
405
username='kafka-user',
406
password='kafka-password'
407
)
408
409
app = faust.App(
410
'sasl-app',
411
broker='kafka://broker:9092',
412
sasl_credentials=sasl_creds
413
)
414
415
# SCRAM-SHA-256 authentication
416
scram_creds = faust.ScramCredentials(
417
username='kafka-user',
418
password='kafka-password',
419
mechanism='SCRAM-SHA-256'
420
)
421
422
app = faust.App(
423
'scram-app',
424
broker='kafka://broker:9092',
425
sasl_credentials=scram_creds
426
)
427
```
428
429
### Kerberos Authentication
430
431
```python
432
# GSSAPI/Kerberos authentication
433
gssapi_creds = faust.GSSAPICredentials(
434
kerberos_service_name='kafka',
435
kerberos_domain_name='EXAMPLE.COM',
436
principal='kafka-client@EXAMPLE.COM'
437
)
438
439
app = faust.App(
440
'kerberos-app',
441
broker='kafka://broker:9092',
442
gssapi_credentials=gssapi_creds
443
)
444
445
# Acquire credentials before starting
446
@app.on_startup.connect
447
async def acquire_kerberos_ticket():
448
gssapi_creds.acquire_credentials()
449
print("Kerberos ticket acquired")
450
451
# Periodic ticket renewal
452
@app.timer(interval=3600.0) # Renew every hour
453
async def renew_kerberos_ticket():
454
if gssapi_creds.renew_credentials():
455
print("Kerberos ticket renewed")
456
```
457
458
### Combined SSL + SASL
459
460
```python
461
# SSL transport with SASL authentication
462
ssl_context = ssl.create_default_context()
463
ssl_context.check_hostname = True
464
465
sasl_creds = faust.SASLCredentials(
466
mechanism='SCRAM-SHA-512',
467
username='secure-user',
468
password='secure-password',
469
ssl_context=ssl_context
470
)
471
472
app = faust.App(
473
'secure-sasl-app',
474
broker='kafka://secure-broker:9093',
475
ssl_context=ssl_context,
476
sasl_credentials=sasl_creds
477
)
478
```
479
480
### OAuth Authentication
481
482
```python
483
# OAuth 2.0 authentication
484
oauth_creds = faust.OAuthCredentials(
485
token_url='https://auth.example.com/oauth/token',
486
client_id='kafka-client',
487
client_secret='client-secret',
488
scope='kafka:read kafka:write'
489
)
490
491
app = faust.App(
492
'oauth-app',
493
broker='kafka://broker:9092',
494
oauth_credentials=oauth_creds
495
)
496
497
@app.on_startup.connect
498
async def get_initial_token():
499
token = await oauth_creds.get_token()
500
print(f"Initial token acquired: {token[:10]}...")
501
502
# Automatic token refresh
503
@app.timer(interval=1800.0) # Refresh every 30 minutes
504
async def refresh_oauth_token():
505
if oauth_creds.is_token_expired():
506
token = await oauth_creds.refresh_token()
507
print("OAuth token refreshed")
508
```
509
510
### Environment-based Configuration
511
512
```python
513
import os
514
515
def create_app_with_auth():
516
auth_method = os.getenv('KAFKA_AUTH_METHOD', 'none')
517
518
if auth_method == 'ssl':
519
credentials = faust.SSLCredentials(
520
cafile=os.getenv('KAFKA_CA_FILE'),
521
certfile=os.getenv('KAFKA_CERT_FILE'),
522
keyfile=os.getenv('KAFKA_KEY_FILE'),
523
password=os.getenv('KAFKA_KEY_PASSWORD')
524
)
525
return faust.App(
526
'env-app',
527
broker=os.getenv('KAFKA_BROKER'),
528
ssl_credentials=credentials
529
)
530
531
elif auth_method == 'sasl':
532
credentials = faust.SASLCredentials(
533
mechanism=os.getenv('KAFKA_SASL_MECHANISM', 'PLAIN'),
534
username=os.getenv('KAFKA_USERNAME'),
535
password=os.getenv('KAFKA_PASSWORD')
536
)
537
return faust.App(
538
'env-app',
539
broker=os.getenv('KAFKA_BROKER'),
540
sasl_credentials=credentials
541
)
542
543
else:
544
return faust.App(
545
'env-app',
546
broker=os.getenv('KAFKA_BROKER', 'kafka://localhost:9092')
547
)
548
549
app = create_app_with_auth()
550
```
551
552
### Error Handling
553
554
```python
555
from faust import AuthenticationError, CredentialsError
556
557
@app.on_startup.connect
558
async def validate_credentials():
559
try:
560
# Validate credentials before starting
561
if hasattr(app, 'ssl_credentials'):
562
app.ssl_credentials.load_verify_locations()
563
564
if hasattr(app, 'gssapi_credentials'):
565
if not app.gssapi_credentials.check_credentials():
566
app.gssapi_credentials.acquire_credentials()
567
568
except CredentialsError as e:
569
print(f"Credential validation failed: {e}")
570
raise
571
572
except AuthenticationError as e:
573
print(f"Authentication failed: {e}")
574
raise
575
```
576
577
## Type Interfaces
578
579
```python { .api }
580
from typing import Protocol, Optional
581
import ssl
582
583
class CredentialsT(Protocol):
584
"""Base type interface for all credentials."""
585
pass
586
587
class SSLCredentialsT(CredentialsT, Protocol):
588
"""Type interface for SSL credentials."""
589
590
context: ssl.SSLContext
591
cafile: Optional[str]
592
certfile: Optional[str]
593
keyfile: Optional[str]
594
595
def load_verify_locations(self, **kwargs) -> None: ...
596
def load_cert_chain(self, certfile: str, **kwargs) -> None: ...
597
598
class SASLCredentialsT(CredentialsT, Protocol):
599
"""Type interface for SASL credentials."""
600
601
mechanism: str
602
username: str
603
password: str
604
ssl_context: Optional[ssl.SSLContext]
605
606
def create_authenticator(self) -> callable: ...
607
608
class GSSAPICredentialsT(CredentialsT, Protocol):
609
"""Type interface for GSSAPI credentials."""
610
611
service_name: str
612
domain_name: Optional[str]
613
principal: Optional[str]
614
615
def acquire_credentials(self) -> None: ...
616
def renew_credentials(self) -> bool: ...
617
def check_credentials(self) -> bool: ...
618
```